Extract core Prometheus value decoders.

Bernerd had suggested extracting the value decoders and bundling them
into the client library.  After some reflection, I tend to agree with
this, since we can start breaking the onion of Prometheus itself and
localize the protocol management into its own scope.

A couple of major changes since moving:

- Protocol 0.0.2 has moved to a struct{} so that our tests can perform
  value matching, which cannot be done against function literals.

- Processing now acquires options to dictate behavioral changes of
  metrics bodies.

- Processing no longer closes the stream, thusly returning this to the
  hands of the caller.

- Process() has been renamed to ProcessSingle to better convey that it
  works on complete message bodies.  This paves the way for better
  streaming payload support that the next API version will offer.
This commit is contained in:
Matt T. Proud 2013-06-08 11:48:16 +02:00
parent c10c61ce05
commit 85899b3f4a
15 changed files with 1472 additions and 1 deletions

View File

@ -1,4 +1,7 @@
language: go
go:
- 1.1
script:
- make -f Makefile.TRAVIS

View File

@ -0,0 +1,15 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package decoding decodes Prometheus clients' data streams for consumers.
package decoding

View File

@ -0,0 +1,54 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"fmt"
"mime"
"net/http"
)
// ProcessorForRequestHeader interprets a HTTP request header to determine
// what Processor should be used for the given input. If no acceptable
// Processor can be found, an error is returned.
func ProcessorForRequestHeader(header http.Header) (Processor, error) {
if header == nil {
return nil, fmt.Errorf("Received illegal and nil header.")
}
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
if err != nil {
return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err)
}
if mediatype != "application/json" {
return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json")
}
var prometheusApiVersion string
if params["schema"] == "prometheus/telemetry" && params["version"] != "" {
prometheusApiVersion = params["version"]
} else {
prometheusApiVersion = header.Get("X-Prometheus-API-Version")
}
switch prometheusApiVersion {
case "0.0.2":
return Processor002, nil
case "0.0.1":
return Processor001, nil
default:
return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion)
}
}

View File

@ -0,0 +1,96 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"fmt"
"net/http"
"testing"
)
func testDiscriminatorHttpHeader(t tester) {
var scenarios = []struct {
input map[string]string
output Processor
err error
}{
{
output: nil,
err: fmt.Errorf("Received illegal and nil header."),
},
{
input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.0"},
output: nil,
err: fmt.Errorf("Unrecognized API version 0.0.0"),
},
{
input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.1"},
output: Processor001,
err: nil,
},
{
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.0`},
output: nil,
err: fmt.Errorf("Unrecognized API version 0.0.0"),
},
{
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.1`},
output: Processor001,
err: nil,
},
{
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.2`},
output: Processor002,
err: nil,
},
}
for i, scenario := range scenarios {
var header http.Header
if len(scenario.input) > 0 {
header = http.Header{}
}
for key, value := range scenario.input {
header.Add(key, value)
}
actual, err := ProcessorForRequestHeader(header)
if scenario.err != err {
if scenario.err != nil && err != nil {
if scenario.err.Error() != err.Error() {
t.Errorf("%d. expected %s, got %s", i, scenario.err, err)
}
} else if scenario.err != nil || err != nil {
t.Errorf("%d. expected %s, got %s", i, scenario.err, err)
}
}
if scenario.output != actual {
t.Errorf("%d. expected %s, got %s", i, scenario.output, actual)
}
}
}
func TestDiscriminatorHttpHeader(t *testing.T) {
testDiscriminatorHttpHeader(t)
}
func BenchmarkDiscriminatorHttpHeader(b *testing.B) {
for i := 0; i < b.N; i++ {
testDiscriminatorHttpHeader(b)
}
}

View File

View File

@ -0,0 +1,79 @@
[
{
"baseLabels": {
"name": "rpc_calls_total",
"job": "batch_job"
},
"docstring": "RPC calls.",
"metric": {
"type": "counter",
"value": [
{
"labels": {
"service": "zed"
},
"value": 25
},
{
"labels": {
"service": "bar"
},
"value": 25
},
{
"labels": {
"service": "foo"
},
"value": 25
}
]
}
},
{
"baseLabels": {
"name": "rpc_latency_microseconds"
},
"docstring": "RPC latency.",
"metric": {
"type": "histogram",
"value": [
{
"labels": {
"service": "foo"
},
"value": {
"0.010000": 15.890724674774395,
"0.050000": 15.890724674774395,
"0.500000": 84.63044031436561,
"0.900000": 160.21100853053224,
"0.990000": 172.49828748957728
}
},
{
"labels": {
"service": "zed"
},
"value": {
"0.010000": 0.0459814091918713,
"0.050000": 0.0459814091918713,
"0.500000": 0.6120456642749681,
"0.900000": 1.355915069887731,
"0.990000": 1.772733213161236
}
},
{
"labels": {
"service": "bar"
},
"value": {
"0.010000": 78.48563317257356,
"0.050000": 78.48563317257356,
"0.500000": 97.31798360385088,
"0.900000": 109.89202084295582,
"0.990000": 109.99626121011262
}
}
]
}
}
]

View File

@ -0,0 +1,38 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
type tester interface {
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
}
// errorEqual compares Go errors for equality.
func errorEqual(left, right error) bool {
if left == right {
return true
}
if left != nil && right != nil {
if left.Error() == right.Error() {
return true
}
return false
}
return false
}

View File

@ -0,0 +1,287 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"encoding/binary"
"fmt"
"hash/fnv"
"sort"
"strings"
"time"
)
type Sample struct {
Metric Metric
Value SampleValue
Timestamp time.Time
}
func (s *Sample) Equal(o *Sample) bool {
if !s.Metric.Equal(o.Metric) {
return false
}
if !s.Timestamp.Equal(o.Timestamp) {
return false
}
if !s.Value.Equal(o.Value) {
return false
}
return true
}
type Samples []*Sample
func (s Samples) Len() int {
return len(s)
}
func (s Samples) Less(i, j int) bool {
switch {
case s[i].Metric.Before(s[j].Metric):
return true
case s[i].Timestamp.Before(s[j].Timestamp):
return true
default:
return false
}
}
func (s Samples) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// A LabelSet is a collection of LabelName and LabelValue pairs. The LabelSet
// may be fully-qualified down to the point where it may resolve to a single
// Metric in the data store or not. All operations that occur within the realm
// of a LabelSet can emit a vector of Metric entities to which the LabelSet may
// match.
type LabelSet map[LabelName]LabelValue
// Helper function to non-destructively merge two label sets.
func (l LabelSet) Merge(other LabelSet) LabelSet {
result := make(LabelSet, len(l))
for k, v := range l {
result[k] = v
}
for k, v := range other {
result[k] = v
}
return result
}
func (l LabelSet) String() string {
labelStrings := make([]string, 0, len(l))
for label, value := range l {
labelStrings = append(labelStrings, fmt.Sprintf("%s='%s'", label, value))
}
sort.Strings(labelStrings)
return fmt.Sprintf("{%s}", strings.Join(labelStrings, ", "))
}
// A LabelValue is an associated value for a LabelName.
type LabelValue string
// A Metric is similar to a LabelSet, but the key difference is that a Metric is
// a singleton and refers to one and only one stream of samples.
type Metric map[LabelName]LabelValue
func (m Metric) Equal(o Metric) bool {
lFingerprint := &Fingerprint{}
rFingerprint := &Fingerprint{}
m.WriteFingerprint(lFingerprint)
o.WriteFingerprint(rFingerprint)
return lFingerprint.Equal(rFingerprint)
}
func (m Metric) Before(o Metric) bool {
lFingerprint := &Fingerprint{}
rFingerprint := &Fingerprint{}
m.WriteFingerprint(lFingerprint)
o.WriteFingerprint(rFingerprint)
return m.Before(o)
}
func (m Metric) WriteFingerprint(f *Fingerprint) {
labelLength := len(m)
labelNames := make([]string, 0, labelLength)
for labelName := range m {
labelNames = append(labelNames, string(labelName))
}
sort.Strings(labelNames)
summer := fnv.New64a()
firstCharacterOfFirstLabelName := ""
lastCharacterOfLastLabelValue := ""
labelMatterLength := 0
for i, labelName := range labelNames {
labelValue := m[LabelName(labelName)]
labelNameLength := len(labelName)
labelValueLength := len(labelValue)
labelMatterLength += labelNameLength + labelValueLength
if i == 0 {
firstCharacterOfFirstLabelName = labelName[0:1]
}
if i == labelLength-1 {
lastCharacterOfLastLabelValue = string(labelValue[labelValueLength-1 : labelValueLength])
}
fmt.Fprintf(summer, "%s%s%s", labelName, `"`, labelValue)
}
f.firstCharacterOfFirstLabelName = firstCharacterOfFirstLabelName
f.hash = binary.LittleEndian.Uint64(summer.Sum(nil))
f.labelMatterLength = uint(labelMatterLength % 10)
f.lastCharacterOfLastLabelValue = lastCharacterOfLastLabelValue
}
// A SampleValue is a representation of a value for a given sample at a given
// time.
type SampleValue float64
func (v SampleValue) Equal(o SampleValue) bool {
return v == o
}
func (v SampleValue) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%f"`, v)), nil
}
func (v SampleValue) String() string {
return fmt.Sprint(float64(v))
}
// Fingerprint provides a hash-capable representation of a Metric.
type Fingerprint struct {
// A hashed representation of the underyling entity. For our purposes, FNV-1A
// 64-bit is used.
hash uint64
firstCharacterOfFirstLabelName string
labelMatterLength uint
lastCharacterOfLastLabelValue string
}
func (f *Fingerprint) String() string {
return f.ToRowKey()
}
// Transforms the Fingerprint into a database row key.
func (f *Fingerprint) ToRowKey() string {
return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, "-")
}
func (f *Fingerprint) Hash() uint64 {
return f.hash
}
func (f *Fingerprint) FirstCharacterOfFirstLabelName() string {
return f.firstCharacterOfFirstLabelName
}
func (f *Fingerprint) LabelMatterLength() uint {
return f.labelMatterLength
}
func (f *Fingerprint) LastCharacterOfLastLabelValue() string {
return f.lastCharacterOfLastLabelValue
}
func (f *Fingerprint) Less(o *Fingerprint) bool {
if f.hash < o.hash {
return true
}
if f.hash > o.hash {
return false
}
if f.firstCharacterOfFirstLabelName < o.firstCharacterOfFirstLabelName {
return true
}
if f.firstCharacterOfFirstLabelName > o.firstCharacterOfFirstLabelName {
return false
}
if f.labelMatterLength < o.labelMatterLength {
return true
}
if f.labelMatterLength > o.labelMatterLength {
return false
}
if f.lastCharacterOfLastLabelValue < o.lastCharacterOfLastLabelValue {
return true
}
if f.lastCharacterOfLastLabelValue > o.lastCharacterOfLastLabelValue {
return false
}
return false
}
func (f *Fingerprint) Equal(o *Fingerprint) bool {
if f.Hash() != o.Hash() {
return false
}
if f.FirstCharacterOfFirstLabelName() != o.FirstCharacterOfFirstLabelName() {
return false
}
if f.LabelMatterLength() != o.LabelMatterLength() {
return false
}
return f.LastCharacterOfLastLabelValue() == o.LastCharacterOfLastLabelValue()
}
// A basic interface only useful in testing contexts for dispensing the time
// in a controlled manner.
type instantProvider interface {
// The current instant.
Now() time.Time
}
// Time is a simple means for fluently wrapping around standard Go timekeeping
// mechanisms to enhance testability without compromising code readability.
//
// It is sufficient for use on bare initialization. A provider should be
// set only for test contexts. When not provided, it emits the current
// system time.
type Time struct {
// The underlying means through which time is provided, if supplied.
Provider instantProvider
}
// Emit the current instant.
func (t *Time) Now() time.Time {
if t.Provider == nil {
return time.Now()
}
return t.Provider.Now()
}

View File

@ -0,0 +1,104 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"runtime"
"testing"
)
func TestFingerprintComparison(t *testing.T) {
fingerprints := []*Fingerprint{
{
hash: 0,
firstCharacterOfFirstLabelName: "b",
labelMatterLength: 1,
lastCharacterOfLastLabelValue: "b",
},
{
hash: 1,
firstCharacterOfFirstLabelName: "a",
labelMatterLength: 0,
lastCharacterOfLastLabelValue: "a",
},
{
hash: 1,
firstCharacterOfFirstLabelName: "a",
labelMatterLength: 1000,
lastCharacterOfLastLabelValue: "b",
},
{
hash: 1,
firstCharacterOfFirstLabelName: "b",
labelMatterLength: 0,
lastCharacterOfLastLabelValue: "a",
},
{
hash: 1,
firstCharacterOfFirstLabelName: "b",
labelMatterLength: 1,
lastCharacterOfLastLabelValue: "a",
},
{
hash: 1,
firstCharacterOfFirstLabelName: "b",
labelMatterLength: 1,
lastCharacterOfLastLabelValue: "b",
},
}
for i := range fingerprints {
if i == 0 {
continue
}
if !fingerprints[i-1].Less(fingerprints[i]) {
t.Errorf("%d expected %s < %s", i, fingerprints[i-1], fingerprints[i])
}
}
}
func BenchmarkFingerprinting(b *testing.B) {
b.StopTimer()
fps := []*Fingerprint{
{
hash: 0,
firstCharacterOfFirstLabelName: "a",
labelMatterLength: 2,
lastCharacterOfLastLabelValue: "z",
},
{
hash: 0,
firstCharacterOfFirstLabelName: "a",
labelMatterLength: 2,
lastCharacterOfLastLabelValue: "z",
},
}
for i := 0; i < 10; i++ {
fps[0].Less(fps[1])
}
b.Logf("N: %v", b.N)
b.StartTimer()
var pre runtime.MemStats
runtime.ReadMemStats(&pre)
for i := 0; i < b.N; i++ {
fps[0].Less(fps[1])
}
var post runtime.MemStats
runtime.ReadMemStats(&post)
b.Logf("allocs: %d items: ", post.TotalAlloc-pre.TotalAlloc)
}

View File

@ -0,0 +1,99 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"io"
"time"
)
const (
// The label name prefix to prepend if a synthetic label is already present
// in the exported metrics.
ExporterLabelPrefix = LabelName("exporter_")
// The label name indicating the metric name of a timeseries.
MetricNameLabel = LabelName("name")
// The label name indicating the job from which a timeseries was scraped.
JobLabel = LabelName("job")
)
// ProcessOptions dictates how the interpreted stream should be rendered for
// consumption.
type ProcessOptions struct {
// Timestamp is added to each value interpreted from the stream.
Timestamp time.Time
// BaseLabels are labels that are accumulated onto each sample, if any.
BaseLabels LabelSet
}
// Processor is responsible for decoding the actual message responses from
// stream into a format that can be consumed with the end result written
// to the results channel.
type Processor interface {
// ProcessSingle treats the input as a single self-contained message body and
// transforms it accordingly. It has no support for streaming.
ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error
}
// Helper function to convert map[string]string into LabelSet.
//
// NOTE: This should be deleted when support for go 1.0.3 is removed; 1.1 is
// smart enough to unmarshal JSON objects into LabelSet directly.
func labelSet(labels map[string]string) LabelSet {
labelset := make(LabelSet, len(labels))
for k, v := range labels {
labelset[LabelName(k)] = LabelValue(v)
}
return labelset
}
// Helper function to merge a target's base labels ontop of the labels of an
// exported sample. If a label is already defined in the exported sample, we
// assume that we are scraping an intermediate exporter and attach
// "exporter_"-prefixes to Prometheus' own base labels.
func mergeTargetLabels(entityLabels, targetLabels LabelSet) LabelSet {
if targetLabels == nil {
targetLabels = LabelSet{}
}
result := LabelSet{}
for label, value := range entityLabels {
result[label] = value
}
for label, labelValue := range targetLabels {
if _, exists := result[label]; exists {
result[ExporterLabelPrefix+label] = labelValue
} else {
result[label] = labelValue
}
}
return result
}
// Result encapsulates the outcome from processing samples from a source.
type Result struct {
Err error
Samples Samples
}
// A LabelName is a key for a LabelSet or Metric. It has a value associated
// therewith.
type LabelName string

View File

@ -0,0 +1,134 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
)
const (
baseLabels001 = "baseLabels"
counter001 = "counter"
docstring001 = "docstring"
gauge001 = "gauge"
histogram001 = "histogram"
labels001 = "labels"
metric001 = "metric"
type001 = "type"
value001 = "value"
percentile001 = "percentile"
)
// Processor002 is responsible for decoding payloads from protocol version
// 0.0.1.
var Processor001 Processor = &processor001{}
// processor001 is responsible for handling API version 0.0.1.
type processor001 struct {
time Time
}
// entity001 represents a the JSON structure that 0.0.1 uses.
type entity001 []struct {
BaseLabels map[string]string `json:"baseLabels"`
Docstring string `json:"docstring"`
Metric struct {
MetricType string `json:"type"`
Value []struct {
Labels map[string]string `json:"labels"`
Value interface{} `json:"value"`
} `json:"value"`
} `json:"metric"`
}
func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
// TODO(matt): Replace with plain-jane JSON unmarshalling.
buffer, err := ioutil.ReadAll(in)
if err != nil {
return err
}
entities := entity001{}
if err = json.Unmarshal(buffer, &entities); err != nil {
return err
}
// TODO(matt): This outer loop is a great basis for parallelization.
pendingSamples := Samples{}
for _, entity := range entities {
for _, value := range entity.Metric.Value {
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(value.Labels))
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
switch entity.Metric.MetricType {
case gauge001, counter001:
sampleValue, ok := value.Value.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value)
out <- &Result{Err: err}
continue
}
pendingSamples = append(pendingSamples, &Sample{
Metric: Metric(labels),
Timestamp: o.Timestamp,
Value: SampleValue(sampleValue),
})
break
case histogram001:
sampleValue, ok := value.Value.(map[string]interface{})
if !ok {
err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value)
out <- &Result{Err: err}
continue
}
for percentile, percentileValue := range sampleValue {
individualValue, ok := percentileValue.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue)
out <- &Result{Err: err}
continue
}
childMetric := make(map[LabelName]LabelValue, len(labels)+1)
for k, v := range labels {
childMetric[k] = v
}
childMetric[LabelName(percentile001)] = LabelValue(percentile)
pendingSamples = append(pendingSamples, &Sample{
Metric: Metric(childMetric),
Timestamp: o.Timestamp,
Value: SampleValue(individualValue),
})
}
break
}
}
}
if len(pendingSamples) > 0 {
out <- &Result{Samples: pendingSamples}
}
return nil
}

View File

@ -0,0 +1,216 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"container/list"
"fmt"
"os"
"path"
"testing"
"time"
)
func testProcessor001Process(t tester) {
var scenarios = []struct {
in string
baseLabels LabelSet
out Samples
err error
}{
{
in: "empty.json",
err: fmt.Errorf("unexpected end of JSON input"),
},
{
in: "test0_0_1-0_0_2.json",
baseLabels: LabelSet{
JobLabel: "batch_exporter",
},
out: Samples{
&Sample{
Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&Sample{
Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&Sample{
Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&Sample{
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&Sample{
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&Sample{
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&Sample{
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&Sample{
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&Sample{
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&Sample{
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.6120456642749681,
},
&Sample{
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 97.31798360385088,
},
&Sample{
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 84.63044031436561,
},
&Sample{
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.355915069887731,
},
&Sample{
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.89202084295582,
},
&Sample{
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 160.21100853053224,
},
&Sample{
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.772733213161236,
},
&Sample{
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.99626121011262,
},
&Sample{
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 172.49828748957728,
},
},
},
}
for i, scenario := range scenarios {
inputChannel := make(chan *Result, 1024)
defer close(inputChannel)
reader, err := os.Open(path.Join("fixtures", scenario.in))
if err != nil {
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
}
options := &ProcessOptions{
Timestamp: time.Now(),
BaseLabels: scenario.baseLabels,
}
err = Processor001.ProcessSingle(reader, inputChannel, options)
if !errorEqual(scenario.err, err) {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
continue
}
delivered := Samples{}
for len(inputChannel) != 0 {
result := <-inputChannel
if result.Err != nil {
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
}
delivered = append(delivered, result.Samples...)
}
if len(delivered) != len(scenario.out) {
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
continue
}
expectedElements := list.New()
for _, j := range scenario.out {
expectedElements.PushBack(j)
}
for j := 0; j < len(delivered); j++ {
actual := delivered[j]
found := false
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
candidate := element.Value.(*Sample)
if candidate.Value != actual.Value {
continue
}
if len(candidate.Metric) != len(actual.Metric) {
continue
}
labelsMatch := false
for key, value := range candidate.Metric {
actualValue, ok := actual.Metric[key]
if !ok {
break
}
if actualValue == value {
labelsMatch = true
break
}
}
if !labelsMatch {
continue
}
// XXX: Test time.
found = true
expectedElements.Remove(element)
}
if !found {
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
}
}
}
}
func TestProcessor001Process(t *testing.T) {
testProcessor001Process(t)
}
func BenchmarkProcessor001Process(b *testing.B) {
for i := 0; i < b.N; i++ {
testProcessor001Process(b)
}
}

View File

@ -0,0 +1,114 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"encoding/json"
"fmt"
"io"
)
// Processor002 is responsible for decoding payloads from protocol version
// 0.0.2.
var Processor002 = &processor002{}
type histogram002 struct {
Labels map[string]string `json:"labels"`
Values map[string]SampleValue `json:"value"`
}
type counter002 struct {
Labels map[string]string `json:"labels"`
Value SampleValue `json:"value"`
}
type processor002 struct{}
func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
// Processor for telemetry schema version 0.0.2.
// container for telemetry data
var entities []struct {
BaseLabels map[string]string `json:"baseLabels"`
Docstring string `json:"docstring"`
Metric struct {
Type string `json:"type"`
Values json.RawMessage `json:"value"`
} `json:"metric"`
}
if err := json.NewDecoder(in).Decode(&entities); err != nil {
return err
}
pendingSamples := Samples{}
for _, entity := range entities {
switch entity.Metric.Type {
case "counter", "gauge":
var values []counter002
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
out <- &Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
}
continue
}
for _, counter := range values {
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(counter.Labels))
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
pendingSamples = append(pendingSamples, &Sample{
Metric: Metric(labels),
Timestamp: o.Timestamp,
Value: counter.Value,
})
}
case "histogram":
var values []histogram002
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
out <- &Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
}
continue
}
for _, histogram := range values {
for percentile, value := range histogram.Values {
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(histogram.Labels))
entityLabels[LabelName("percentile")] = LabelValue(percentile)
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
pendingSamples = append(pendingSamples, &Sample{
Metric: Metric(labels),
Timestamp: o.Timestamp,
Value: value,
})
}
}
default:
out <- &Result{
Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type),
}
}
}
if len(pendingSamples) > 0 {
out <- &Result{Samples: pendingSamples}
}
return nil
}

View File

@ -0,0 +1,231 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package decoding
import (
"container/list"
"fmt"
"os"
"path"
"runtime"
"testing"
"time"
)
func testProcessor002Process(t tester) {
var scenarios = []struct {
in string
baseLabels LabelSet
out Samples
err error
}{
{
in: "empty.json",
err: fmt.Errorf("EOF"),
},
{
in: "test0_0_1-0_0_2.json",
baseLabels: LabelSet{
JobLabel: "batch_exporter",
},
out: Samples{
&Sample{
Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&Sample{
Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&Sample{
Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&Sample{
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&Sample{
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&Sample{
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&Sample{
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&Sample{
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&Sample{
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&Sample{
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.6120456642749681,
},
&Sample{
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 97.31798360385088,
},
&Sample{
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 84.63044031436561,
},
&Sample{
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.355915069887731,
},
&Sample{
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.89202084295582,
},
&Sample{
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 160.21100853053224,
},
&Sample{
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.772733213161236,
},
&Sample{
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.99626121011262,
},
&Sample{
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 172.49828748957728,
},
},
},
}
for i, scenario := range scenarios {
inputChannel := make(chan *Result, 1024)
defer close(inputChannel)
reader, err := os.Open(path.Join("fixtures", scenario.in))
if err != nil {
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
}
options := &ProcessOptions{
Timestamp: time.Now(),
BaseLabels: scenario.baseLabels,
}
err = Processor002.ProcessSingle(reader, inputChannel, options)
if !errorEqual(scenario.err, err) {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
continue
}
delivered := Samples{}
for len(inputChannel) != 0 {
result := <-inputChannel
if result.Err != nil {
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
}
delivered = append(delivered, result.Samples...)
}
if len(delivered) != len(scenario.out) {
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
continue
}
expectedElements := list.New()
for _, j := range scenario.out {
expectedElements.PushBack(j)
}
for j := 0; j < len(delivered); j++ {
actual := delivered[j]
found := false
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
candidate := element.Value.(*Sample)
if candidate.Value != actual.Value {
continue
}
if len(candidate.Metric) != len(actual.Metric) {
continue
}
labelsMatch := false
for key, value := range candidate.Metric {
actualValue, ok := actual.Metric[key]
if !ok {
break
}
if actualValue == value {
labelsMatch = true
break
}
}
if !labelsMatch {
continue
}
// XXX: Test time.
found = true
expectedElements.Remove(element)
}
if !found {
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
}
}
}
}
func TestProcessor002Process(t *testing.T) {
testProcessor002Process(t)
}
func BenchmarkProcessor002Process(b *testing.B) {
b.StopTimer()
pre := runtime.MemStats{}
runtime.ReadMemStats(&pre)
b.StartTimer()
for i := 0; i < b.N; i++ {
testProcessor002Process(b)
}
post := runtime.MemStats{}
runtime.ReadMemStats(&post)
allocated := post.TotalAlloc - pre.TotalAlloc
b.Logf("Allocated %d at %f per cycle with %d cycles.", allocated, float64(allocated)/float64(b.N), b.N)
}

View File

@ -4,7 +4,8 @@
// Use of this source code is governed by a BSD-style license that can be found
// in the LICENSE file.
// Prometheus' client side metric primitives and telemetry exposition framework.
// Package prometheus provides client side metric primitives and a telemetry
// exposition framework.
//
// This package provides both metric primitives and tools for their exposition
// to the Prometheus time series collection and computation framework.