Support the new protobuf fields.

- Full support for UNTYPED type.

- Receptive support for timestamp_ms (i.e. the processor can process
  it, but the client library cannot yet create it - which is kind of
  intended as timestamps are meant for other things like federation,
  which will need separate support anyway).

Change-Id: I5913164a80089943d49ad58bf86e465a843ab82b
This commit is contained in:
Bjoern Rabenstein 2014-04-16 19:02:52 +02:00
parent ecac33bed0
commit 46fc7a3748
4 changed files with 344 additions and 5 deletions

View File

@ -60,6 +60,10 @@ func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, out Ingester, o *Proc
if err := extractSummary(out, o, family); err != nil {
return err
}
case dto.MetricType_UNTYPED:
if err := extractUntyped(out, o, family); err != nil {
return err
}
}
}
@ -77,7 +81,11 @@ func extractCounter(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
sample := new(model.Sample)
samples = append(samples, sample)
sample.Timestamp = o.Timestamp
if m.TimestampMs != nil {
sample.Timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000)
} else {
sample.Timestamp = o.Timestamp
}
sample.Metric = model.Metric{}
metric := sample.Metric
@ -104,7 +112,11 @@ func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
sample := new(model.Sample)
samples = append(samples, sample)
sample.Timestamp = o.Timestamp
if m.TimestampMs != nil {
sample.Timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000)
} else {
sample.Timestamp = o.Timestamp
}
sample.Metric = model.Metric{}
metric := sample.Metric
@ -128,11 +140,16 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
continue
}
timestamp := o.Timestamp
if m.TimestampMs != nil {
timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000)
}
for _, q := range m.Summary.Quantile {
sample := new(model.Sample)
samples = append(samples, sample)
sample.Timestamp = o.Timestamp
sample.Timestamp = timestamp
sample.Metric = model.Metric{}
metric := sample.Metric
@ -149,7 +166,7 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
if m.Summary.SampleSum != nil {
sum := new(model.Sample)
sum.Timestamp = o.Timestamp
sum.Timestamp = timestamp
metric := model.Metric{}
for _, p := range m.Label {
metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
@ -162,7 +179,7 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
if m.Summary.SampleCount != nil {
count := new(model.Sample)
count.Timestamp = o.Timestamp
count.Timestamp = timestamp
metric := model.Metric{}
for _, p := range m.Label {
metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
@ -176,3 +193,34 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
return out.Ingest(&Result{Samples: samples})
}
func extractUntyped(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
samples := make(model.Samples, 0, len(f.Metric))
for _, m := range f.Metric {
if m.Untyped == nil {
continue
}
sample := new(model.Sample)
samples = append(samples, sample)
if m.TimestampMs != nil {
sample.Timestamp = model.TimestampFromUnix(*m.TimestampMs / 1000)
} else {
sample.Timestamp = o.Timestamp
}
sample.Metric = model.Metric{}
metric := sample.Metric
for _, p := range m.Label {
metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
metric[model.MetricNameLabel] = model.LabelValue(f.GetName())
sample.Value = model.SampleValue(m.Untyped.GetValue())
}
return out.Ingest(&Result{Samples: samples})
}

View File

@ -43,6 +43,7 @@ const (
floatFormat = 'f'
floatPrecision = 6
gaugeTypeValue = "gauge"
untypedTypeValue = "untyped"
histogramTypeValue = "histogram"
typeKey = "type"
valueKey = "value"

138
prometheus/untyped.go Normal file
View File

@ -0,0 +1,138 @@
// Copyright (c) 2013, Prometheus Team
// All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package prometheus
import (
"encoding/json"
"fmt"
"sync"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/client_model/go"
)
// An Untyped metric represents scalar values without any type implications
// whatsoever. If you need to handle values that cannot be represented by any of
// the existing metric types, you can use an Untyped type and rely on contracts
// outside of Prometheus to ensure that these values are understood correctly.
type Untyped interface {
Metric
Set(labels map[string]string, value float64) float64
}
type untypedVector struct {
Labels map[string]string `json:"labels"`
Value float64 `json:"value"`
}
// NewUntyped returns a newly allocated Untyped metric ready to be used.
func NewUntyped() Untyped {
return &untyped{
values: map[uint64]*untypedVector{},
}
}
type untyped struct {
mutex sync.RWMutex
values map[uint64]*untypedVector
}
func (metric *untyped) String() string {
formatString := "[Untyped %s]"
metric.mutex.RLock()
defer metric.mutex.RUnlock()
return fmt.Sprintf(formatString, metric.values)
}
func (metric *untyped) Set(labels map[string]string, value float64) float64 {
if labels == nil {
labels = blankLabelsSingleton
}
signature := labelValuesToSignature(labels)
metric.mutex.Lock()
defer metric.mutex.Unlock()
if original, ok := metric.values[signature]; ok {
original.Value = value
} else {
metric.values[signature] = &untypedVector{
Labels: labels,
Value: value,
}
}
return value
}
func (metric *untyped) Reset(labels map[string]string) {
signature := labelValuesToSignature(labels)
metric.mutex.Lock()
defer metric.mutex.Unlock()
delete(metric.values, signature)
}
func (metric *untyped) ResetAll() {
metric.mutex.Lock()
defer metric.mutex.Unlock()
for key, value := range metric.values {
for label := range value.Labels {
delete(value.Labels, label)
}
delete(metric.values, key)
}
}
func (metric *untyped) MarshalJSON() ([]byte, error) {
metric.mutex.RLock()
defer metric.mutex.RUnlock()
values := make([]*untypedVector, 0, len(metric.values))
for _, value := range metric.values {
values = append(values, value)
}
return json.Marshal(map[string]interface{}{
typeKey: untypedTypeValue,
valueKey: values,
})
}
func (metric *untyped) dumpChildren(f *dto.MetricFamily) {
metric.mutex.RLock()
defer metric.mutex.RUnlock()
f.Type = dto.MetricType_UNTYPED.Enum()
for _, child := range metric.values {
c := &dto.Untyped{
Value: proto.Float64(child.Value),
}
m := &dto.Metric{
Untyped: c,
}
for name, value := range child.Labels {
p := &dto.LabelPair{
Name: proto.String(name),
Value: proto.String(value),
}
m.Label = append(m.Label, p)
}
f.Metric = append(f.Metric, m)
}
}

152
prometheus/untyped_test.go Normal file
View File

@ -0,0 +1,152 @@
// Copyright (c) 2013, Prometheus Team
// All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package prometheus
import (
"encoding/json"
"testing"
)
func testUntyped(t tester) {
type input struct {
steps []func(g Untyped)
}
type output struct {
value string
}
var scenarios = []struct {
in input
out output
}{
{
in: input{
steps: []func(g Untyped){},
},
out: output{
value: `{"type":"untyped","value":[]}`,
},
},
{
in: input{
steps: []func(g Untyped){
func(g Untyped) {
g.Set(nil, 1)
},
},
},
out: output{
value: `{"type":"untyped","value":[{"labels":{},"value":1}]}`,
},
},
{
in: input{
steps: []func(g Untyped){
func(g Untyped) {
g.Set(map[string]string{}, 2)
},
},
},
out: output{
value: `{"type":"untyped","value":[{"labels":{},"value":2}]}`,
},
},
{
in: input{
steps: []func(g Untyped){
func(g Untyped) {
g.Set(map[string]string{}, 3)
},
func(g Untyped) {
g.Set(map[string]string{}, 5)
},
},
},
out: output{
value: `{"type":"untyped","value":[{"labels":{},"value":5}]}`,
},
},
{
in: input{
steps: []func(g Untyped){
func(g Untyped) {
g.Set(map[string]string{"handler": "/foo"}, 13)
},
func(g Untyped) {
g.Set(map[string]string{"handler": "/bar"}, 17)
},
func(g Untyped) {
g.Reset(map[string]string{"handler": "/bar"})
},
},
},
out: output{
value: `{"type":"untyped","value":[{"labels":{"handler":"/foo"},"value":13}]}`,
},
},
{
in: input{
steps: []func(g Untyped){
func(g Untyped) {
g.Set(map[string]string{"handler": "/foo"}, 13)
},
func(g Untyped) {
g.Set(map[string]string{"handler": "/bar"}, 17)
},
func(g Untyped) {
g.ResetAll()
},
},
},
out: output{
value: `{"type":"untyped","value":[]}`,
},
},
{
in: input{
steps: []func(g Untyped){
func(g Untyped) {
g.Set(map[string]string{"handler": "/foo"}, 19)
},
},
},
out: output{
value: `{"type":"untyped","value":[{"labels":{"handler":"/foo"},"value":19}]}`,
},
},
}
for i, scenario := range scenarios {
untyped := NewUntyped()
for _, step := range scenario.in.steps {
step(untyped)
}
bytes, err := json.Marshal(untyped)
if err != nil {
t.Errorf("%d. could not marshal into JSON %s", i, err)
continue
}
asString := string(bytes)
if scenario.out.value != asString {
t.Errorf("%d. expected %q, got %q", i, scenario.out.value, asString)
}
}
}
func TestUntyped(t *testing.T) {
testUntyped(t)
}
func BenchmarkUntyped(b *testing.B) {
for i := 0; i < b.N; i++ {
testUntyped(b)
}
}