Merge pull request #18 from prometheus/refactor/decoder-inclusion
Extract core Prometheus value decoders.
This commit is contained in:
commit
d36afbbd8d
|
@ -1,4 +1,7 @@
|
|||
language: go
|
||||
|
||||
go:
|
||||
- 1.1
|
||||
|
||||
script:
|
||||
- make -f Makefile.TRAVIS
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package decoding decodes Prometheus clients' data streams for consumers.
|
||||
package decoding
|
|
@ -0,0 +1,54 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"mime"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// ProcessorForRequestHeader interprets a HTTP request header to determine
|
||||
// what Processor should be used for the given input. If no acceptable
|
||||
// Processor can be found, an error is returned.
|
||||
func ProcessorForRequestHeader(header http.Header) (Processor, error) {
|
||||
if header == nil {
|
||||
return nil, fmt.Errorf("Received illegal and nil header.")
|
||||
}
|
||||
|
||||
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err)
|
||||
}
|
||||
if mediatype != "application/json" {
|
||||
return nil, fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json")
|
||||
}
|
||||
|
||||
var prometheusApiVersion string
|
||||
|
||||
if params["schema"] == "prometheus/telemetry" && params["version"] != "" {
|
||||
prometheusApiVersion = params["version"]
|
||||
} else {
|
||||
prometheusApiVersion = header.Get("X-Prometheus-API-Version")
|
||||
}
|
||||
|
||||
switch prometheusApiVersion {
|
||||
case "0.0.2":
|
||||
return Processor002, nil
|
||||
case "0.0.1":
|
||||
return Processor001, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("Unrecognized API version %s", prometheusApiVersion)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func testDiscriminatorHttpHeader(t tester) {
|
||||
var scenarios = []struct {
|
||||
input map[string]string
|
||||
output Processor
|
||||
err error
|
||||
}{
|
||||
{
|
||||
output: nil,
|
||||
err: fmt.Errorf("Received illegal and nil header."),
|
||||
},
|
||||
{
|
||||
input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.0"},
|
||||
output: nil,
|
||||
err: fmt.Errorf("Unrecognized API version 0.0.0"),
|
||||
},
|
||||
{
|
||||
input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.1"},
|
||||
output: Processor001,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.0`},
|
||||
output: nil,
|
||||
err: fmt.Errorf("Unrecognized API version 0.0.0"),
|
||||
},
|
||||
{
|
||||
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.1`},
|
||||
output: Processor001,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.2`},
|
||||
output: Processor002,
|
||||
err: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
var header http.Header
|
||||
|
||||
if len(scenario.input) > 0 {
|
||||
header = http.Header{}
|
||||
}
|
||||
|
||||
for key, value := range scenario.input {
|
||||
header.Add(key, value)
|
||||
}
|
||||
|
||||
actual, err := ProcessorForRequestHeader(header)
|
||||
|
||||
if scenario.err != err {
|
||||
if scenario.err != nil && err != nil {
|
||||
if scenario.err.Error() != err.Error() {
|
||||
t.Errorf("%d. expected %s, got %s", i, scenario.err, err)
|
||||
}
|
||||
} else if scenario.err != nil || err != nil {
|
||||
t.Errorf("%d. expected %s, got %s", i, scenario.err, err)
|
||||
}
|
||||
}
|
||||
|
||||
if scenario.output != actual {
|
||||
t.Errorf("%d. expected %s, got %s", i, scenario.output, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDiscriminatorHttpHeader(t *testing.T) {
|
||||
testDiscriminatorHttpHeader(t)
|
||||
}
|
||||
|
||||
func BenchmarkDiscriminatorHttpHeader(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
testDiscriminatorHttpHeader(b)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
[
|
||||
{
|
||||
"baseLabels": {
|
||||
"name": "rpc_calls_total",
|
||||
"job": "batch_job"
|
||||
},
|
||||
"docstring": "RPC calls.",
|
||||
"metric": {
|
||||
"type": "counter",
|
||||
"value": [
|
||||
{
|
||||
"labels": {
|
||||
"service": "zed"
|
||||
},
|
||||
"value": 25
|
||||
},
|
||||
{
|
||||
"labels": {
|
||||
"service": "bar"
|
||||
},
|
||||
"value": 25
|
||||
},
|
||||
{
|
||||
"labels": {
|
||||
"service": "foo"
|
||||
},
|
||||
"value": 25
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
{
|
||||
"baseLabels": {
|
||||
"name": "rpc_latency_microseconds"
|
||||
},
|
||||
"docstring": "RPC latency.",
|
||||
"metric": {
|
||||
"type": "histogram",
|
||||
"value": [
|
||||
{
|
||||
"labels": {
|
||||
"service": "foo"
|
||||
},
|
||||
"value": {
|
||||
"0.010000": 15.890724674774395,
|
||||
"0.050000": 15.890724674774395,
|
||||
"0.500000": 84.63044031436561,
|
||||
"0.900000": 160.21100853053224,
|
||||
"0.990000": 172.49828748957728
|
||||
}
|
||||
},
|
||||
{
|
||||
"labels": {
|
||||
"service": "zed"
|
||||
},
|
||||
"value": {
|
||||
"0.010000": 0.0459814091918713,
|
||||
"0.050000": 0.0459814091918713,
|
||||
"0.500000": 0.6120456642749681,
|
||||
"0.900000": 1.355915069887731,
|
||||
"0.990000": 1.772733213161236
|
||||
}
|
||||
},
|
||||
{
|
||||
"labels": {
|
||||
"service": "bar"
|
||||
},
|
||||
"value": {
|
||||
"0.010000": 78.48563317257356,
|
||||
"0.050000": 78.48563317257356,
|
||||
"0.500000": 97.31798360385088,
|
||||
"0.900000": 109.89202084295582,
|
||||
"0.990000": 109.99626121011262
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
|
@ -0,0 +1,38 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
type tester interface {
|
||||
Error(args ...interface{})
|
||||
Errorf(format string, args ...interface{})
|
||||
Fatal(args ...interface{})
|
||||
Fatalf(format string, args ...interface{})
|
||||
}
|
||||
|
||||
// errorEqual compares Go errors for equality.
|
||||
func errorEqual(left, right error) bool {
|
||||
if left == right {
|
||||
return true
|
||||
}
|
||||
|
||||
if left != nil && right != nil {
|
||||
if left.Error() == right.Error() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
|
@ -0,0 +1,287 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Sample struct {
|
||||
Metric Metric
|
||||
Value SampleValue
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
func (s *Sample) Equal(o *Sample) bool {
|
||||
if !s.Metric.Equal(o.Metric) {
|
||||
return false
|
||||
}
|
||||
if !s.Timestamp.Equal(o.Timestamp) {
|
||||
return false
|
||||
}
|
||||
if !s.Value.Equal(o.Value) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
type Samples []*Sample
|
||||
|
||||
func (s Samples) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s Samples) Less(i, j int) bool {
|
||||
switch {
|
||||
case s[i].Metric.Before(s[j].Metric):
|
||||
return true
|
||||
case s[i].Timestamp.Before(s[j].Timestamp):
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s Samples) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
// A LabelSet is a collection of LabelName and LabelValue pairs. The LabelSet
|
||||
// may be fully-qualified down to the point where it may resolve to a single
|
||||
// Metric in the data store or not. All operations that occur within the realm
|
||||
// of a LabelSet can emit a vector of Metric entities to which the LabelSet may
|
||||
// match.
|
||||
type LabelSet map[LabelName]LabelValue
|
||||
|
||||
// Helper function to non-destructively merge two label sets.
|
||||
func (l LabelSet) Merge(other LabelSet) LabelSet {
|
||||
result := make(LabelSet, len(l))
|
||||
|
||||
for k, v := range l {
|
||||
result[k] = v
|
||||
}
|
||||
|
||||
for k, v := range other {
|
||||
result[k] = v
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (l LabelSet) String() string {
|
||||
labelStrings := make([]string, 0, len(l))
|
||||
for label, value := range l {
|
||||
labelStrings = append(labelStrings, fmt.Sprintf("%s='%s'", label, value))
|
||||
}
|
||||
|
||||
sort.Strings(labelStrings)
|
||||
|
||||
return fmt.Sprintf("{%s}", strings.Join(labelStrings, ", "))
|
||||
}
|
||||
|
||||
// A LabelValue is an associated value for a LabelName.
|
||||
type LabelValue string
|
||||
|
||||
// A Metric is similar to a LabelSet, but the key difference is that a Metric is
|
||||
// a singleton and refers to one and only one stream of samples.
|
||||
type Metric map[LabelName]LabelValue
|
||||
|
||||
func (m Metric) Equal(o Metric) bool {
|
||||
lFingerprint := &Fingerprint{}
|
||||
rFingerprint := &Fingerprint{}
|
||||
|
||||
m.WriteFingerprint(lFingerprint)
|
||||
o.WriteFingerprint(rFingerprint)
|
||||
|
||||
return lFingerprint.Equal(rFingerprint)
|
||||
}
|
||||
|
||||
func (m Metric) Before(o Metric) bool {
|
||||
lFingerprint := &Fingerprint{}
|
||||
rFingerprint := &Fingerprint{}
|
||||
|
||||
m.WriteFingerprint(lFingerprint)
|
||||
o.WriteFingerprint(rFingerprint)
|
||||
|
||||
return m.Before(o)
|
||||
}
|
||||
|
||||
func (m Metric) WriteFingerprint(f *Fingerprint) {
|
||||
labelLength := len(m)
|
||||
labelNames := make([]string, 0, labelLength)
|
||||
|
||||
for labelName := range m {
|
||||
labelNames = append(labelNames, string(labelName))
|
||||
}
|
||||
|
||||
sort.Strings(labelNames)
|
||||
|
||||
summer := fnv.New64a()
|
||||
firstCharacterOfFirstLabelName := ""
|
||||
lastCharacterOfLastLabelValue := ""
|
||||
labelMatterLength := 0
|
||||
|
||||
for i, labelName := range labelNames {
|
||||
labelValue := m[LabelName(labelName)]
|
||||
labelNameLength := len(labelName)
|
||||
labelValueLength := len(labelValue)
|
||||
labelMatterLength += labelNameLength + labelValueLength
|
||||
|
||||
if i == 0 {
|
||||
firstCharacterOfFirstLabelName = labelName[0:1]
|
||||
}
|
||||
if i == labelLength-1 {
|
||||
lastCharacterOfLastLabelValue = string(labelValue[labelValueLength-1 : labelValueLength])
|
||||
}
|
||||
|
||||
fmt.Fprintf(summer, "%s%s%s", labelName, `"`, labelValue)
|
||||
}
|
||||
|
||||
f.firstCharacterOfFirstLabelName = firstCharacterOfFirstLabelName
|
||||
f.hash = binary.LittleEndian.Uint64(summer.Sum(nil))
|
||||
f.labelMatterLength = uint(labelMatterLength % 10)
|
||||
f.lastCharacterOfLastLabelValue = lastCharacterOfLastLabelValue
|
||||
|
||||
}
|
||||
|
||||
// A SampleValue is a representation of a value for a given sample at a given
|
||||
// time.
|
||||
type SampleValue float64
|
||||
|
||||
func (v SampleValue) Equal(o SampleValue) bool {
|
||||
return v == o
|
||||
}
|
||||
|
||||
func (v SampleValue) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf(`"%f"`, v)), nil
|
||||
}
|
||||
|
||||
func (v SampleValue) String() string {
|
||||
return fmt.Sprint(float64(v))
|
||||
}
|
||||
|
||||
// Fingerprint provides a hash-capable representation of a Metric.
|
||||
type Fingerprint struct {
|
||||
// A hashed representation of the underyling entity. For our purposes, FNV-1A
|
||||
// 64-bit is used.
|
||||
hash uint64
|
||||
firstCharacterOfFirstLabelName string
|
||||
labelMatterLength uint
|
||||
lastCharacterOfLastLabelValue string
|
||||
}
|
||||
|
||||
func (f *Fingerprint) String() string {
|
||||
return f.ToRowKey()
|
||||
}
|
||||
|
||||
// Transforms the Fingerprint into a database row key.
|
||||
func (f *Fingerprint) ToRowKey() string {
|
||||
return strings.Join([]string{fmt.Sprintf("%020d", f.hash), f.firstCharacterOfFirstLabelName, fmt.Sprint(f.labelMatterLength), f.lastCharacterOfLastLabelValue}, "-")
|
||||
}
|
||||
|
||||
func (f *Fingerprint) Hash() uint64 {
|
||||
return f.hash
|
||||
}
|
||||
|
||||
func (f *Fingerprint) FirstCharacterOfFirstLabelName() string {
|
||||
return f.firstCharacterOfFirstLabelName
|
||||
}
|
||||
|
||||
func (f *Fingerprint) LabelMatterLength() uint {
|
||||
return f.labelMatterLength
|
||||
}
|
||||
|
||||
func (f *Fingerprint) LastCharacterOfLastLabelValue() string {
|
||||
return f.lastCharacterOfLastLabelValue
|
||||
}
|
||||
|
||||
func (f *Fingerprint) Less(o *Fingerprint) bool {
|
||||
if f.hash < o.hash {
|
||||
return true
|
||||
}
|
||||
if f.hash > o.hash {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.firstCharacterOfFirstLabelName < o.firstCharacterOfFirstLabelName {
|
||||
return true
|
||||
}
|
||||
if f.firstCharacterOfFirstLabelName > o.firstCharacterOfFirstLabelName {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.labelMatterLength < o.labelMatterLength {
|
||||
return true
|
||||
}
|
||||
if f.labelMatterLength > o.labelMatterLength {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.lastCharacterOfLastLabelValue < o.lastCharacterOfLastLabelValue {
|
||||
return true
|
||||
}
|
||||
if f.lastCharacterOfLastLabelValue > o.lastCharacterOfLastLabelValue {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (f *Fingerprint) Equal(o *Fingerprint) bool {
|
||||
if f.Hash() != o.Hash() {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.FirstCharacterOfFirstLabelName() != o.FirstCharacterOfFirstLabelName() {
|
||||
return false
|
||||
}
|
||||
|
||||
if f.LabelMatterLength() != o.LabelMatterLength() {
|
||||
return false
|
||||
}
|
||||
|
||||
return f.LastCharacterOfLastLabelValue() == o.LastCharacterOfLastLabelValue()
|
||||
}
|
||||
|
||||
// A basic interface only useful in testing contexts for dispensing the time
|
||||
// in a controlled manner.
|
||||
type instantProvider interface {
|
||||
// The current instant.
|
||||
Now() time.Time
|
||||
}
|
||||
|
||||
// Time is a simple means for fluently wrapping around standard Go timekeeping
|
||||
// mechanisms to enhance testability without compromising code readability.
|
||||
//
|
||||
// It is sufficient for use on bare initialization. A provider should be
|
||||
// set only for test contexts. When not provided, it emits the current
|
||||
// system time.
|
||||
type Time struct {
|
||||
// The underlying means through which time is provided, if supplied.
|
||||
Provider instantProvider
|
||||
}
|
||||
|
||||
// Emit the current instant.
|
||||
func (t *Time) Now() time.Time {
|
||||
if t.Provider == nil {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
return t.Provider.Now()
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFingerprintComparison(t *testing.T) {
|
||||
fingerprints := []*Fingerprint{
|
||||
{
|
||||
hash: 0,
|
||||
firstCharacterOfFirstLabelName: "b",
|
||||
labelMatterLength: 1,
|
||||
lastCharacterOfLastLabelValue: "b",
|
||||
},
|
||||
{
|
||||
hash: 1,
|
||||
firstCharacterOfFirstLabelName: "a",
|
||||
labelMatterLength: 0,
|
||||
lastCharacterOfLastLabelValue: "a",
|
||||
},
|
||||
{
|
||||
hash: 1,
|
||||
firstCharacterOfFirstLabelName: "a",
|
||||
labelMatterLength: 1000,
|
||||
lastCharacterOfLastLabelValue: "b",
|
||||
},
|
||||
{
|
||||
hash: 1,
|
||||
firstCharacterOfFirstLabelName: "b",
|
||||
labelMatterLength: 0,
|
||||
lastCharacterOfLastLabelValue: "a",
|
||||
},
|
||||
{
|
||||
hash: 1,
|
||||
firstCharacterOfFirstLabelName: "b",
|
||||
labelMatterLength: 1,
|
||||
lastCharacterOfLastLabelValue: "a",
|
||||
},
|
||||
{
|
||||
hash: 1,
|
||||
firstCharacterOfFirstLabelName: "b",
|
||||
labelMatterLength: 1,
|
||||
lastCharacterOfLastLabelValue: "b",
|
||||
},
|
||||
}
|
||||
for i := range fingerprints {
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if !fingerprints[i-1].Less(fingerprints[i]) {
|
||||
t.Errorf("%d expected %s < %s", i, fingerprints[i-1], fingerprints[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkFingerprinting(b *testing.B) {
|
||||
b.StopTimer()
|
||||
fps := []*Fingerprint{
|
||||
{
|
||||
hash: 0,
|
||||
firstCharacterOfFirstLabelName: "a",
|
||||
labelMatterLength: 2,
|
||||
lastCharacterOfLastLabelValue: "z",
|
||||
},
|
||||
{
|
||||
hash: 0,
|
||||
firstCharacterOfFirstLabelName: "a",
|
||||
labelMatterLength: 2,
|
||||
lastCharacterOfLastLabelValue: "z",
|
||||
},
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
fps[0].Less(fps[1])
|
||||
}
|
||||
b.Logf("N: %v", b.N)
|
||||
b.StartTimer()
|
||||
|
||||
var pre runtime.MemStats
|
||||
runtime.ReadMemStats(&pre)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
fps[0].Less(fps[1])
|
||||
}
|
||||
|
||||
var post runtime.MemStats
|
||||
runtime.ReadMemStats(&post)
|
||||
|
||||
b.Logf("allocs: %d items: ", post.TotalAlloc-pre.TotalAlloc)
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// The label name prefix to prepend if a synthetic label is already present
|
||||
// in the exported metrics.
|
||||
ExporterLabelPrefix = LabelName("exporter_")
|
||||
|
||||
// The label name indicating the metric name of a timeseries.
|
||||
MetricNameLabel = LabelName("name")
|
||||
|
||||
// The label name indicating the job from which a timeseries was scraped.
|
||||
JobLabel = LabelName("job")
|
||||
)
|
||||
|
||||
// ProcessOptions dictates how the interpreted stream should be rendered for
|
||||
// consumption.
|
||||
type ProcessOptions struct {
|
||||
// Timestamp is added to each value interpreted from the stream.
|
||||
Timestamp time.Time
|
||||
|
||||
// BaseLabels are labels that are accumulated onto each sample, if any.
|
||||
BaseLabels LabelSet
|
||||
}
|
||||
|
||||
// Processor is responsible for decoding the actual message responses from
|
||||
// stream into a format that can be consumed with the end result written
|
||||
// to the results channel.
|
||||
type Processor interface {
|
||||
// ProcessSingle treats the input as a single self-contained message body and
|
||||
// transforms it accordingly. It has no support for streaming.
|
||||
ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error
|
||||
}
|
||||
|
||||
// Helper function to convert map[string]string into LabelSet.
|
||||
//
|
||||
// NOTE: This should be deleted when support for go 1.0.3 is removed; 1.1 is
|
||||
// smart enough to unmarshal JSON objects into LabelSet directly.
|
||||
func labelSet(labels map[string]string) LabelSet {
|
||||
labelset := make(LabelSet, len(labels))
|
||||
|
||||
for k, v := range labels {
|
||||
labelset[LabelName(k)] = LabelValue(v)
|
||||
}
|
||||
|
||||
return labelset
|
||||
}
|
||||
|
||||
// Helper function to merge a target's base labels ontop of the labels of an
|
||||
// exported sample. If a label is already defined in the exported sample, we
|
||||
// assume that we are scraping an intermediate exporter and attach
|
||||
// "exporter_"-prefixes to Prometheus' own base labels.
|
||||
func mergeTargetLabels(entityLabels, targetLabels LabelSet) LabelSet {
|
||||
if targetLabels == nil {
|
||||
targetLabels = LabelSet{}
|
||||
}
|
||||
|
||||
result := LabelSet{}
|
||||
|
||||
for label, value := range entityLabels {
|
||||
result[label] = value
|
||||
}
|
||||
|
||||
for label, labelValue := range targetLabels {
|
||||
if _, exists := result[label]; exists {
|
||||
result[ExporterLabelPrefix+label] = labelValue
|
||||
} else {
|
||||
result[label] = labelValue
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Result encapsulates the outcome from processing samples from a source.
|
||||
type Result struct {
|
||||
Err error
|
||||
Samples Samples
|
||||
}
|
||||
|
||||
// A LabelName is a key for a LabelSet or Metric. It has a value associated
|
||||
// therewith.
|
||||
type LabelName string
|
|
@ -0,0 +1,134 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
const (
|
||||
baseLabels001 = "baseLabels"
|
||||
counter001 = "counter"
|
||||
docstring001 = "docstring"
|
||||
gauge001 = "gauge"
|
||||
histogram001 = "histogram"
|
||||
labels001 = "labels"
|
||||
metric001 = "metric"
|
||||
type001 = "type"
|
||||
value001 = "value"
|
||||
percentile001 = "percentile"
|
||||
)
|
||||
|
||||
// Processor002 is responsible for decoding payloads from protocol version
|
||||
// 0.0.1.
|
||||
var Processor001 Processor = &processor001{}
|
||||
|
||||
// processor001 is responsible for handling API version 0.0.1.
|
||||
type processor001 struct {
|
||||
time Time
|
||||
}
|
||||
|
||||
// entity001 represents a the JSON structure that 0.0.1 uses.
|
||||
type entity001 []struct {
|
||||
BaseLabels map[string]string `json:"baseLabels"`
|
||||
Docstring string `json:"docstring"`
|
||||
Metric struct {
|
||||
MetricType string `json:"type"`
|
||||
Value []struct {
|
||||
Labels map[string]string `json:"labels"`
|
||||
Value interface{} `json:"value"`
|
||||
} `json:"value"`
|
||||
} `json:"metric"`
|
||||
}
|
||||
|
||||
func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
|
||||
// TODO(matt): Replace with plain-jane JSON unmarshalling.
|
||||
buffer, err := ioutil.ReadAll(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entities := entity001{}
|
||||
if err = json.Unmarshal(buffer, &entities); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(matt): This outer loop is a great basis for parallelization.
|
||||
pendingSamples := Samples{}
|
||||
for _, entity := range entities {
|
||||
for _, value := range entity.Metric.Value {
|
||||
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(value.Labels))
|
||||
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
|
||||
|
||||
switch entity.Metric.MetricType {
|
||||
case gauge001, counter001:
|
||||
sampleValue, ok := value.Value.(float64)
|
||||
if !ok {
|
||||
err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value)
|
||||
out <- &Result{Err: err}
|
||||
continue
|
||||
}
|
||||
|
||||
pendingSamples = append(pendingSamples, &Sample{
|
||||
Metric: Metric(labels),
|
||||
Timestamp: o.Timestamp,
|
||||
Value: SampleValue(sampleValue),
|
||||
})
|
||||
|
||||
break
|
||||
|
||||
case histogram001:
|
||||
sampleValue, ok := value.Value.(map[string]interface{})
|
||||
if !ok {
|
||||
err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value)
|
||||
out <- &Result{Err: err}
|
||||
continue
|
||||
}
|
||||
|
||||
for percentile, percentileValue := range sampleValue {
|
||||
individualValue, ok := percentileValue.(float64)
|
||||
if !ok {
|
||||
err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue)
|
||||
out <- &Result{Err: err}
|
||||
continue
|
||||
}
|
||||
|
||||
childMetric := make(map[LabelName]LabelValue, len(labels)+1)
|
||||
|
||||
for k, v := range labels {
|
||||
childMetric[k] = v
|
||||
}
|
||||
|
||||
childMetric[LabelName(percentile001)] = LabelValue(percentile)
|
||||
|
||||
pendingSamples = append(pendingSamples, &Sample{
|
||||
Metric: Metric(childMetric),
|
||||
Timestamp: o.Timestamp,
|
||||
Value: SampleValue(individualValue),
|
||||
})
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(pendingSamples) > 0 {
|
||||
out <- &Result{Samples: pendingSamples}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,216 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func testProcessor001Process(t tester) {
|
||||
var scenarios = []struct {
|
||||
in string
|
||||
baseLabels LabelSet
|
||||
out Samples
|
||||
err error
|
||||
}{
|
||||
{
|
||||
in: "empty.json",
|
||||
err: fmt.Errorf("unexpected end of JSON input"),
|
||||
},
|
||||
{
|
||||
in: "test0_0_1-0_0_2.json",
|
||||
baseLabels: LabelSet{
|
||||
JobLabel: "batch_exporter",
|
||||
},
|
||||
out: Samples{
|
||||
&Sample{
|
||||
Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||
Value: 25,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||
Value: 25,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||
Value: 25,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 0.0459814091918713,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 78.48563317257356,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 15.890724674774395,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 0.0459814091918713,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 78.48563317257356,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 15.890724674774395,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 0.6120456642749681,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 97.31798360385088,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 84.63044031436561,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 1.355915069887731,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 109.89202084295582,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 160.21100853053224,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 1.772733213161236,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 109.99626121011262,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 172.49828748957728,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
inputChannel := make(chan *Result, 1024)
|
||||
|
||||
defer close(inputChannel)
|
||||
|
||||
reader, err := os.Open(path.Join("fixtures", scenario.in))
|
||||
if err != nil {
|
||||
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
|
||||
}
|
||||
|
||||
options := &ProcessOptions{
|
||||
Timestamp: time.Now(),
|
||||
BaseLabels: scenario.baseLabels,
|
||||
}
|
||||
err = Processor001.ProcessSingle(reader, inputChannel, options)
|
||||
if !errorEqual(scenario.err, err) {
|
||||
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
|
||||
continue
|
||||
}
|
||||
|
||||
delivered := Samples{}
|
||||
|
||||
for len(inputChannel) != 0 {
|
||||
result := <-inputChannel
|
||||
if result.Err != nil {
|
||||
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
|
||||
}
|
||||
delivered = append(delivered, result.Samples...)
|
||||
}
|
||||
|
||||
if len(delivered) != len(scenario.out) {
|
||||
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
expectedElements := list.New()
|
||||
for _, j := range scenario.out {
|
||||
expectedElements.PushBack(j)
|
||||
}
|
||||
|
||||
for j := 0; j < len(delivered); j++ {
|
||||
actual := delivered[j]
|
||||
|
||||
found := false
|
||||
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
|
||||
candidate := element.Value.(*Sample)
|
||||
|
||||
if candidate.Value != actual.Value {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(candidate.Metric) != len(actual.Metric) {
|
||||
continue
|
||||
}
|
||||
|
||||
labelsMatch := false
|
||||
|
||||
for key, value := range candidate.Metric {
|
||||
actualValue, ok := actual.Metric[key]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if actualValue == value {
|
||||
labelsMatch = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !labelsMatch {
|
||||
continue
|
||||
}
|
||||
|
||||
// XXX: Test time.
|
||||
found = true
|
||||
expectedElements.Remove(element)
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessor001Process(t *testing.T) {
|
||||
testProcessor001Process(t)
|
||||
}
|
||||
|
||||
func BenchmarkProcessor001Process(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
testProcessor001Process(b)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Processor002 is responsible for decoding payloads from protocol version
|
||||
// 0.0.2.
|
||||
var Processor002 = &processor002{}
|
||||
|
||||
type histogram002 struct {
|
||||
Labels map[string]string `json:"labels"`
|
||||
Values map[string]SampleValue `json:"value"`
|
||||
}
|
||||
|
||||
type counter002 struct {
|
||||
Labels map[string]string `json:"labels"`
|
||||
Value SampleValue `json:"value"`
|
||||
}
|
||||
|
||||
type processor002 struct{}
|
||||
|
||||
func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
|
||||
// Processor for telemetry schema version 0.0.2.
|
||||
// container for telemetry data
|
||||
var entities []struct {
|
||||
BaseLabels map[string]string `json:"baseLabels"`
|
||||
Docstring string `json:"docstring"`
|
||||
Metric struct {
|
||||
Type string `json:"type"`
|
||||
Values json.RawMessage `json:"value"`
|
||||
} `json:"metric"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(in).Decode(&entities); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pendingSamples := Samples{}
|
||||
for _, entity := range entities {
|
||||
switch entity.Metric.Type {
|
||||
case "counter", "gauge":
|
||||
var values []counter002
|
||||
|
||||
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
|
||||
out <- &Result{
|
||||
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, counter := range values {
|
||||
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(counter.Labels))
|
||||
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
|
||||
|
||||
pendingSamples = append(pendingSamples, &Sample{
|
||||
Metric: Metric(labels),
|
||||
Timestamp: o.Timestamp,
|
||||
Value: counter.Value,
|
||||
})
|
||||
}
|
||||
|
||||
case "histogram":
|
||||
var values []histogram002
|
||||
|
||||
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
|
||||
out <- &Result{
|
||||
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, histogram := range values {
|
||||
for percentile, value := range histogram.Values {
|
||||
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(histogram.Labels))
|
||||
entityLabels[LabelName("percentile")] = LabelValue(percentile)
|
||||
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
|
||||
|
||||
pendingSamples = append(pendingSamples, &Sample{
|
||||
Metric: Metric(labels),
|
||||
Timestamp: o.Timestamp,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
out <- &Result{
|
||||
Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(pendingSamples) > 0 {
|
||||
out <- &Result{Samples: pendingSamples}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,231 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package decoding
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func testProcessor002Process(t tester) {
|
||||
var scenarios = []struct {
|
||||
in string
|
||||
baseLabels LabelSet
|
||||
out Samples
|
||||
err error
|
||||
}{
|
||||
{
|
||||
in: "empty.json",
|
||||
err: fmt.Errorf("EOF"),
|
||||
},
|
||||
{
|
||||
in: "test0_0_1-0_0_2.json",
|
||||
baseLabels: LabelSet{
|
||||
JobLabel: "batch_exporter",
|
||||
},
|
||||
out: Samples{
|
||||
&Sample{
|
||||
Metric: Metric{"service": "zed", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||
Value: 25,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"service": "bar", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||
Value: 25,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"service": "foo", MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||
Value: 25,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 0.0459814091918713,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 78.48563317257356,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.010000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 15.890724674774395,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 0.0459814091918713,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 78.48563317257356,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.050000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 15.890724674774395,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 0.6120456642749681,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 97.31798360385088,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.500000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 84.63044031436561,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 1.355915069887731,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 109.89202084295582,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.900000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 160.21100853053224,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||
Value: 1.772733213161236,
|
||||
},
|
||||
&Sample{
|
||||
|
||||
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||
Value: 109.99626121011262,
|
||||
},
|
||||
&Sample{
|
||||
Metric: Metric{"percentile": "0.990000", MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||
Value: 172.49828748957728,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
inputChannel := make(chan *Result, 1024)
|
||||
|
||||
defer close(inputChannel)
|
||||
|
||||
reader, err := os.Open(path.Join("fixtures", scenario.in))
|
||||
if err != nil {
|
||||
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
|
||||
}
|
||||
|
||||
options := &ProcessOptions{
|
||||
Timestamp: time.Now(),
|
||||
BaseLabels: scenario.baseLabels,
|
||||
}
|
||||
err = Processor002.ProcessSingle(reader, inputChannel, options)
|
||||
if !errorEqual(scenario.err, err) {
|
||||
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
|
||||
continue
|
||||
}
|
||||
|
||||
delivered := Samples{}
|
||||
|
||||
for len(inputChannel) != 0 {
|
||||
result := <-inputChannel
|
||||
if result.Err != nil {
|
||||
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
|
||||
}
|
||||
delivered = append(delivered, result.Samples...)
|
||||
}
|
||||
|
||||
if len(delivered) != len(scenario.out) {
|
||||
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
expectedElements := list.New()
|
||||
for _, j := range scenario.out {
|
||||
expectedElements.PushBack(j)
|
||||
}
|
||||
|
||||
for j := 0; j < len(delivered); j++ {
|
||||
actual := delivered[j]
|
||||
|
||||
found := false
|
||||
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
|
||||
candidate := element.Value.(*Sample)
|
||||
|
||||
if candidate.Value != actual.Value {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(candidate.Metric) != len(actual.Metric) {
|
||||
continue
|
||||
}
|
||||
|
||||
labelsMatch := false
|
||||
|
||||
for key, value := range candidate.Metric {
|
||||
actualValue, ok := actual.Metric[key]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if actualValue == value {
|
||||
labelsMatch = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !labelsMatch {
|
||||
continue
|
||||
}
|
||||
|
||||
// XXX: Test time.
|
||||
found = true
|
||||
expectedElements.Remove(element)
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessor002Process(t *testing.T) {
|
||||
testProcessor002Process(t)
|
||||
}
|
||||
|
||||
func BenchmarkProcessor002Process(b *testing.B) {
|
||||
b.StopTimer()
|
||||
|
||||
pre := runtime.MemStats{}
|
||||
runtime.ReadMemStats(&pre)
|
||||
|
||||
b.StartTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
testProcessor002Process(b)
|
||||
}
|
||||
|
||||
post := runtime.MemStats{}
|
||||
runtime.ReadMemStats(&post)
|
||||
|
||||
allocated := post.TotalAlloc - pre.TotalAlloc
|
||||
|
||||
b.Logf("Allocated %d at %f per cycle with %d cycles.", allocated, float64(allocated)/float64(b.N), b.N)
|
||||
}
|
|
@ -4,7 +4,8 @@
|
|||
// Use of this source code is governed by a BSD-style license that can be found
|
||||
// in the LICENSE file.
|
||||
|
||||
// Prometheus' client side metric primitives and telemetry exposition framework.
|
||||
// Package prometheus provides client side metric primitives and a telemetry
|
||||
// exposition framework.
|
||||
//
|
||||
// This package provides both metric primitives and tools for their exposition
|
||||
// to the Prometheus time series collection and computation framework.
|
||||
|
|
Loading…
Reference in New Issue