From 46fc7a3748977a79a4bc67bf418b53c9fd7a1044 Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Wed, 16 Apr 2014 19:02:52 +0200 Subject: [PATCH] Support the new protobuf fields. - Full support for UNTYPED type. - Receptive support for timestamp_ms (i.e. the processor can process it, but the client library cannot yet create it - which is kind of intended as timestamps are meant for other things like federation, which will need separate support anyway). Change-Id: I5913164a80089943d49ad58bf86e465a843ab82b --- extraction/metricfamilyprocessor.go | 58 ++++++++++- prometheus/constants.go | 1 + prometheus/untyped.go | 138 +++++++++++++++++++++++++ prometheus/untyped_test.go | 152 ++++++++++++++++++++++++++++ 4 files changed, 344 insertions(+), 5 deletions(-) create mode 100644 prometheus/untyped.go create mode 100644 prometheus/untyped_test.go diff --git a/extraction/metricfamilyprocessor.go b/extraction/metricfamilyprocessor.go index 6fa871f..3f8a5c6 100644 --- a/extraction/metricfamilyprocessor.go +++ b/extraction/metricfamilyprocessor.go @@ -60,6 +60,10 @@ func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, out Ingester, o *Proc if err := extractSummary(out, o, family); err != nil { return err } + case dto.MetricType_UNTYPED: + if err := extractUntyped(out, o, family); err != nil { + return err + } } } @@ -77,7 +81,11 @@ func extractCounter(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error sample := new(model.Sample) samples = append(samples, sample) - sample.Timestamp = o.Timestamp + if m.TimestampMs != nil { + sample.Timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000) + } else { + sample.Timestamp = o.Timestamp + } sample.Metric = model.Metric{} metric := sample.Metric @@ -104,7 +112,11 @@ func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { sample := new(model.Sample) samples = append(samples, sample) - sample.Timestamp = o.Timestamp + if m.TimestampMs != nil { + sample.Timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000) + } else { + sample.Timestamp = o.Timestamp + } sample.Metric = model.Metric{} metric := sample.Metric @@ -128,11 +140,16 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error continue } + timestamp := o.Timestamp + if m.TimestampMs != nil { + timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000) + } + for _, q := range m.Summary.Quantile { sample := new(model.Sample) samples = append(samples, sample) - sample.Timestamp = o.Timestamp + sample.Timestamp = timestamp sample.Metric = model.Metric{} metric := sample.Metric @@ -149,7 +166,7 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error if m.Summary.SampleSum != nil { sum := new(model.Sample) - sum.Timestamp = o.Timestamp + sum.Timestamp = timestamp metric := model.Metric{} for _, p := range m.Label { metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) @@ -162,7 +179,7 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error if m.Summary.SampleCount != nil { count := new(model.Sample) - count.Timestamp = o.Timestamp + count.Timestamp = timestamp metric := model.Metric{} for _, p := range m.Label { metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) @@ -176,3 +193,34 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error return out.Ingest(&Result{Samples: samples}) } + +func extractUntyped(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { + samples := make(model.Samples, 0, len(f.Metric)) + + for _, m := range f.Metric { + if m.Untyped == nil { + continue + } + + sample := new(model.Sample) + samples = append(samples, sample) + + if m.TimestampMs != nil { + sample.Timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000) + } else { + sample.Timestamp = o.Timestamp + } + sample.Metric = model.Metric{} + metric := sample.Metric + + for _, p := range m.Label { + metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) + } + + metric[model.MetricNameLabel] = model.LabelValue(f.GetName()) + + sample.Value = model.SampleValue(m.Untyped.GetValue()) + } + + return out.Ingest(&Result{Samples: samples}) +} diff --git a/prometheus/constants.go b/prometheus/constants.go index 09b5634..6227553 100644 --- a/prometheus/constants.go +++ b/prometheus/constants.go @@ -43,6 +43,7 @@ const ( floatFormat = 'f' floatPrecision = 6 gaugeTypeValue = "gauge" + untypedTypeValue = "untyped" histogramTypeValue = "histogram" typeKey = "type" valueKey = "value" diff --git a/prometheus/untyped.go b/prometheus/untyped.go new file mode 100644 index 0000000..e0a40c9 --- /dev/null +++ b/prometheus/untyped.go @@ -0,0 +1,138 @@ +// Copyright (c) 2013, Prometheus Team +// All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package prometheus + +import ( + "encoding/json" + "fmt" + "sync" + + "code.google.com/p/goprotobuf/proto" + + dto "github.com/prometheus/client_model/go" +) + +// An Untyped metric represents scalar values without any type implications +// whatsoever. If you need to handle values that cannot be represented by any of +// the existing metric types, you can use an Untyped type and rely on contracts +// outside of Prometheus to ensure that these values are understood correctly. +type Untyped interface { + Metric + Set(labels map[string]string, value float64) float64 +} + +type untypedVector struct { + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` +} + +// NewUntyped returns a newly allocated Untyped metric ready to be used. +func NewUntyped() Untyped { + return &untyped{ + values: map[uint64]*untypedVector{}, + } +} + +type untyped struct { + mutex sync.RWMutex + values map[uint64]*untypedVector +} + +func (metric *untyped) String() string { + formatString := "[Untyped %s]" + + metric.mutex.RLock() + defer metric.mutex.RUnlock() + + return fmt.Sprintf(formatString, metric.values) +} + +func (metric *untyped) Set(labels map[string]string, value float64) float64 { + if labels == nil { + labels = blankLabelsSingleton + } + + signature := labelValuesToSignature(labels) + + metric.mutex.Lock() + defer metric.mutex.Unlock() + + if original, ok := metric.values[signature]; ok { + original.Value = value + } else { + metric.values[signature] = &untypedVector{ + Labels: labels, + Value: value, + } + } + + return value +} + +func (metric *untyped) Reset(labels map[string]string) { + signature := labelValuesToSignature(labels) + + metric.mutex.Lock() + defer metric.mutex.Unlock() + + delete(metric.values, signature) +} + +func (metric *untyped) ResetAll() { + metric.mutex.Lock() + defer metric.mutex.Unlock() + + for key, value := range metric.values { + for label := range value.Labels { + delete(value.Labels, label) + } + delete(metric.values, key) + } +} + +func (metric *untyped) MarshalJSON() ([]byte, error) { + metric.mutex.RLock() + defer metric.mutex.RUnlock() + + values := make([]*untypedVector, 0, len(metric.values)) + for _, value := range metric.values { + values = append(values, value) + } + + return json.Marshal(map[string]interface{}{ + typeKey: untypedTypeValue, + valueKey: values, + }) +} + +func (metric *untyped) dumpChildren(f *dto.MetricFamily) { + metric.mutex.RLock() + defer metric.mutex.RUnlock() + + f.Type = dto.MetricType_UNTYPED.Enum() + + for _, child := range metric.values { + c := &dto.Untyped{ + Value: proto.Float64(child.Value), + } + + m := &dto.Metric{ + Untyped: c, + } + + for name, value := range child.Labels { + p := &dto.LabelPair{ + Name: proto.String(name), + Value: proto.String(value), + } + + m.Label = append(m.Label, p) + } + + f.Metric = append(f.Metric, m) + } +} diff --git a/prometheus/untyped_test.go b/prometheus/untyped_test.go new file mode 100644 index 0000000..f39cd87 --- /dev/null +++ b/prometheus/untyped_test.go @@ -0,0 +1,152 @@ +// Copyright (c) 2013, Prometheus Team +// All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package prometheus + +import ( + "encoding/json" + "testing" +) + +func testUntyped(t tester) { + type input struct { + steps []func(g Untyped) + } + type output struct { + value string + } + + var scenarios = []struct { + in input + out output + }{ + { + in: input{ + steps: []func(g Untyped){}, + }, + out: output{ + value: `{"type":"untyped","value":[]}`, + }, + }, + { + in: input{ + steps: []func(g Untyped){ + func(g Untyped) { + g.Set(nil, 1) + }, + }, + }, + out: output{ + value: `{"type":"untyped","value":[{"labels":{},"value":1}]}`, + }, + }, + { + in: input{ + steps: []func(g Untyped){ + func(g Untyped) { + g.Set(map[string]string{}, 2) + }, + }, + }, + out: output{ + value: `{"type":"untyped","value":[{"labels":{},"value":2}]}`, + }, + }, + { + in: input{ + steps: []func(g Untyped){ + func(g Untyped) { + g.Set(map[string]string{}, 3) + }, + func(g Untyped) { + g.Set(map[string]string{}, 5) + }, + }, + }, + out: output{ + value: `{"type":"untyped","value":[{"labels":{},"value":5}]}`, + }, + }, + { + in: input{ + steps: []func(g Untyped){ + func(g Untyped) { + g.Set(map[string]string{"handler": "/foo"}, 13) + }, + func(g Untyped) { + g.Set(map[string]string{"handler": "/bar"}, 17) + }, + func(g Untyped) { + g.Reset(map[string]string{"handler": "/bar"}) + }, + }, + }, + out: output{ + value: `{"type":"untyped","value":[{"labels":{"handler":"/foo"},"value":13}]}`, + }, + }, + { + in: input{ + steps: []func(g Untyped){ + func(g Untyped) { + g.Set(map[string]string{"handler": "/foo"}, 13) + }, + func(g Untyped) { + g.Set(map[string]string{"handler": "/bar"}, 17) + }, + func(g Untyped) { + g.ResetAll() + }, + }, + }, + out: output{ + value: `{"type":"untyped","value":[]}`, + }, + }, + { + in: input{ + steps: []func(g Untyped){ + func(g Untyped) { + g.Set(map[string]string{"handler": "/foo"}, 19) + }, + }, + }, + out: output{ + value: `{"type":"untyped","value":[{"labels":{"handler":"/foo"},"value":19}]}`, + }, + }, + } + + for i, scenario := range scenarios { + untyped := NewUntyped() + + for _, step := range scenario.in.steps { + step(untyped) + } + + bytes, err := json.Marshal(untyped) + if err != nil { + t.Errorf("%d. could not marshal into JSON %s", i, err) + continue + } + + asString := string(bytes) + + if scenario.out.value != asString { + t.Errorf("%d. expected %q, got %q", i, scenario.out.value, asString) + } + } +} + +func TestUntyped(t *testing.T) { + testUntyped(t) +} + +func BenchmarkUntyped(b *testing.B) { + for i := 0; i < b.N; i++ { + testUntyped(b) + } +}