Merge pull request #25 from prometheus/refactor/ingestion/signatures

Update the Consumer Signature
This commit is contained in:
Matt T. Proud 2013-08-12 04:10:18 -07:00
commit 88e73cf99c
9 changed files with 382 additions and 447 deletions

View File

@ -59,8 +59,7 @@ dependencies: source_path $(GOCC)
$(GO) get github.com/matttproud/gocheck
test: build
$(MAKE) -C prometheus test
$(MAKE) -C examples test
$(GO) test ./...
advice: test
$(MAKE) -C prometheus advice

View File

@ -33,7 +33,7 @@ type metricFamilyProcessor struct{}
// more details.
var MetricFamilyProcessor = new(metricFamilyProcessor)
func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o *ProcessOptions) error {
func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, out Ingester, o *ProcessOptions) error {
family := new(dto.MetricFamily)
for {
@ -49,16 +49,24 @@ func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o *
switch *family.Type {
case dto.MetricType_COUNTER:
extractCounter(r, o, family)
if err := extractCounter(out, o, family); err != nil {
return err
}
case dto.MetricType_GAUGE:
extractGauge(r, o, family)
if err := extractGauge(out, o, family); err != nil {
return err
}
case dto.MetricType_SUMMARY:
extractSummary(r, o, family)
if err := extractSummary(out, o, family); err != nil {
return err
}
}
}
return nil
}
func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
func extractCounter(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
samples := make(model.Samples, 0, len(f.Metric))
for _, m := range f.Metric {
@ -73,9 +81,6 @@ func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
sample.Metric = model.Metric{}
metric := sample.Metric
for l, v := range o.BaseLabels {
metric[l] = v
}
for _, p := range m.Label {
metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
@ -85,12 +90,10 @@ func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
sample.Value = model.SampleValue(m.Counter.GetValue())
}
r <- &Result{
Samples: samples,
}
return out.Ingest(&Result{Samples: samples})
}
func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
samples := make(model.Samples, 0, len(f.Metric))
for _, m := range f.Metric {
@ -105,9 +108,6 @@ func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
sample.Metric = model.Metric{}
metric := sample.Metric
for l, v := range o.BaseLabels {
metric[l] = v
}
for _, p := range m.Label {
metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
@ -117,12 +117,10 @@ func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
sample.Value = model.SampleValue(m.Gauge.GetValue())
}
r <- &Result{
Samples: samples,
}
return out.Ingest(&Result{Samples: samples})
}
func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
// BUG(matt): Lack of dumping of sum or count.
samples := make(model.Samples, 0, len(f.Metric))
@ -139,9 +137,6 @@ func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
sample.Metric = model.Metric{}
metric := sample.Metric
for l, v := range o.BaseLabels {
metric[l] = v
}
for _, p := range m.Label {
metric[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
}
@ -154,7 +149,5 @@ func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
}
}
r <- &Result{
Samples: samples,
}
return out.Ingest(&Result{Samples: samples})
}

View File

@ -14,6 +14,7 @@
package extraction
import (
"sort"
"strings"
"testing"
"time"
@ -24,54 +25,37 @@ import (
var testTime = time.Now()
type metricFamilyProcessorScenario struct {
in string
out []*Result
in string
expected, actual []*Result
}
func (s *metricFamilyProcessorScenario) Ingest(r *Result) error {
s.actual = append(s.actual, r)
return nil
}
func (s *metricFamilyProcessorScenario) test(t *testing.T, set int) {
i := strings.NewReader(s.in)
chanSize := 1
if len(s.out) > 0 {
chanSize = len(s.out) * 3
}
r := make(chan *Result, chanSize)
o := &ProcessOptions{
Timestamp: testTime,
BaseLabels: model.LabelSet{"base": "label"},
Timestamp: testTime,
}
err := MetricFamilyProcessor.ProcessSingle(i, r, o)
err := MetricFamilyProcessor.ProcessSingle(i, s, o)
if err != nil {
t.Fatalf("%d. got error: %s", set, err)
}
close(r)
actual := []*Result{}
for e := range r {
actual = append(actual, e)
if len(s.expected) != len(s.actual) {
t.Fatalf("%d. expected length %d, got %d", set, len(s.expected), len(s.actual))
}
if len(actual) != len(s.out) {
t.Fatalf("%d. expected length %d, got %d", set, len(s.out), len(actual))
}
for i, expected := range s.expected {
sort.Sort(s.actual[i].Samples)
sort.Sort(expected.Samples)
for i, expected := range s.out {
if expected.Err != actual[i].Err {
t.Fatalf("%d. expected err of %s, got %s", set, expected.Err, actual[i].Err)
}
if len(expected.Samples) != len(actual[i].Samples) {
t.Fatalf("%d.%d expected %d samples, got %d", set, i, len(expected.Samples), len(actual[i].Samples))
}
for j := 0; j < len(expected.Samples); j++ {
e := expected.Samples[j]
a := actual[i].Samples[j]
if !a.Equal(e) {
t.Fatalf("%d.%d.%d expected %s sample, got %s", set, i, j, e, a)
}
if !expected.equal(s.actual[i]) {
t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
}
}
}
@ -83,16 +67,16 @@ func TestMetricFamilyProcessor(t *testing.T) {
},
{
in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@",
out: []*Result{
expected: []*Result{
{
Samples: model.Samples{
&model.Sample{
Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value"},
Metric: model.Metric{"name": "request_count", "some_label_name": "some_label_value"},
Value: -42,
Timestamp: testTime,
},
&model.Sample{
Metric: model.Metric{"base": "label", "name": "request_count", "another_label_name": "another_label_value"},
Metric: model.Metric{"name": "request_count", "another_label_name": "another_label_value"},
Value: 84,
Timestamp: testTime,
},
@ -102,21 +86,21 @@ func TestMetricFamilyProcessor(t *testing.T) {
},
{
in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@",
out: []*Result{
expected: []*Result{
{
Samples: model.Samples{
&model.Sample{
Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value", "quantile": "0.99"},
Metric: model.Metric{"name": "request_count", "some_label_name": "some_label_value", "quantile": "0.99"},
Value: -42,
Timestamp: testTime,
},
&model.Sample{
Metric: model.Metric{"base": "label", "name": "request_count", "some_label_name": "some_label_value", "quantile": "0.999"},
Metric: model.Metric{"name": "request_count", "some_label_name": "some_label_value", "quantile": "0.999"},
Value: -84,
Timestamp: testTime,
},
&model.Sample{
Metric: model.Metric{"base": "label", "name": "request_count", "another_label_name": "another_label_value", "quantile": "0.5"},
Metric: model.Metric{"name": "request_count", "another_label_name": "another_label_value", "quantile": "0.5"},
Value: 10,
Timestamp: testTime,
},

View File

@ -25,9 +25,11 @@ import (
type ProcessOptions struct {
// Timestamp is added to each value interpreted from the stream.
Timestamp time.Time
}
// BaseLabels are labels that are accumulated onto each sample, if any.
BaseLabels model.LabelSet
// Ingester consumes result streams in whatever way is desired by the user.
type Ingester interface {
Ingest(*Result) error
}
// Processor is responsible for decoding the actual message responses from
@ -36,7 +38,7 @@ type ProcessOptions struct {
type Processor interface {
// ProcessSingle treats the input as a single self-contained message body and
// transforms it accordingly. It has no support for streaming.
ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error
ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error
}
// Helper function to convert map[string]string into LabelSet.
@ -53,37 +55,42 @@ func labelSet(labels map[string]string) model.LabelSet {
return labelset
}
// Helper function to merge a target's base labels ontop of the labels of an
// exported sample. If a label is already defined in the exported sample, we
// assume that we are scraping an intermediate exporter and attach
// "exporter_"-prefixes to Prometheus' own base labels.
func mergeTargetLabels(entityLabels, targetLabels model.LabelSet) model.LabelSet {
if targetLabels == nil {
targetLabels = model.LabelSet{}
}
result := model.LabelSet{}
for label, value := range entityLabels {
result[label] = value
}
for label, labelValue := range targetLabels {
if _, exists := result[label]; exists {
result[model.ExporterLabelPrefix+label] = labelValue
} else {
result[label] = labelValue
}
}
return result
}
// Result encapsulates the outcome from processing samples from a source.
type Result struct {
Err error
Samples model.Samples
}
func (r *Result) equal(o *Result) bool {
if r == o {
return true
}
if r.Err != o.Err {
if r.Err == nil || o.Err == nil {
return false
}
if r.Err.Error() != o.Err.Error() {
return false
}
}
if len(r.Samples) != len(o.Samples) {
return false
}
for i, mine := range r.Samples {
other := o.Samples[i]
if !mine.Equal(other) {
return false
}
}
return true
}
// A basic interface only useful in testing contexts for dispensing the time
// in a controlled manner.
type instantProvider interface {

View File

@ -55,7 +55,7 @@ type entity001 []struct {
} `json:"metric"`
}
func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
func (p *processor001) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error {
// TODO(matt): Replace with plain-jane JSON unmarshalling.
buffer, err := ioutil.ReadAll(in)
if err != nil {
@ -71,15 +71,16 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
pendingSamples := model.Samples{}
for _, entity := range entities {
for _, value := range entity.Metric.Value {
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(value.Labels))
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
labels := labelSet(entity.BaseLabels).Merge(labelSet(value.Labels))
switch entity.Metric.MetricType {
case gauge001, counter001:
sampleValue, ok := value.Value.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value)
out <- &Result{Err: err}
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
}
@ -95,7 +96,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
sampleValue, ok := value.Value.(map[string]interface{})
if !ok {
err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value)
out <- &Result{Err: err}
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
}
@ -103,7 +106,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
individualValue, ok := percentileValue.(float64)
if !ok {
err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue)
out <- &Result{Err: err}
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
}
@ -127,7 +132,7 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
}
}
if len(pendingSamples) > 0 {
out <- &Result{Samples: pendingSamples}
return out.Ingest(&Result{Samples: pendingSamples})
}
return nil

View File

@ -14,10 +14,10 @@
package extraction
import (
"container/list"
"fmt"
"os"
"path"
"sort"
"testing"
"time"
@ -25,186 +25,157 @@ import (
"github.com/prometheus/client_golang/test"
)
var test001Time = time.Now()
type testProcessor001ProcessScenario struct {
in string
expected, actual []*Result
err error
}
func (s *testProcessor001ProcessScenario) Ingest(r *Result) error {
s.actual = append(s.actual, r)
return nil
}
func (s *testProcessor001ProcessScenario) test(t test.Tester, set int) {
reader, err := os.Open(path.Join("fixtures", s.in))
if err != nil {
t.Fatalf("%d. couldn't open scenario input file %s: %s", set, s.in, err)
}
options := &ProcessOptions{
Timestamp: test001Time,
}
if err := Processor001.ProcessSingle(reader, s, options); !test.ErrorEqual(s.err, err) {
t.Fatalf("%d. expected err of %s, got %s", set, s.err, err)
}
if len(s.actual) != len(s.expected) {
t.Fatalf("%d. expected output length of %d, got %d", set, len(s.expected), len(s.actual))
}
for i, expected := range s.expected {
sort.Sort(s.actual[i].Samples)
sort.Sort(expected.Samples)
if !expected.equal(s.actual[i]) {
t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
}
}
}
func testProcessor001Process(t test.Tester) {
var scenarios = []struct {
in string
baseLabels model.LabelSet
out model.Samples
err error
}{
var scenarios = []testProcessor001ProcessScenario{
{
in: "empty.json",
err: fmt.Errorf("unexpected end of JSON input"),
},
{
in: "test0_0_1-0_0_2.json",
baseLabels: model.LabelSet{
model.JobLabel: "batch_exporter",
},
out: model.Samples{
&model.Sample{
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&model.Sample{
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&model.Sample{
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&model.Sample{
expected: []*Result{
{
Samples: model.Samples{
&model.Sample{
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.0459814091918713,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 78.48563317257356,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 15.890724674774395,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.0459814091918713,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 78.48563317257356,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 15.890724674774395,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.6120456642749681,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.6120456642749681,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 97.31798360385088,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 84.63044031436561,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.355915069887731,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.89202084295582,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 160.21100853053224,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.772733213161236,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.99626121011262,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 172.49828748957728,
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 97.31798360385088,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 84.63044031436561,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 1.355915069887731,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 109.89202084295582,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 160.21100853053224,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 1.772733213161236,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 109.99626121011262,
Timestamp: test001Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 172.49828748957728,
Timestamp: test001Time,
},
},
},
},
},
}
for i, scenario := range scenarios {
inputChannel := make(chan *Result, 1024)
defer close(inputChannel)
reader, err := os.Open(path.Join("fixtures", scenario.in))
if err != nil {
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
}
options := &ProcessOptions{
Timestamp: time.Now(),
BaseLabels: scenario.baseLabels,
}
err = Processor001.ProcessSingle(reader, inputChannel, options)
if !test.ErrorEqual(scenario.err, err) {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
continue
}
delivered := model.Samples{}
for len(inputChannel) != 0 {
result := <-inputChannel
if result.Err != nil {
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
}
delivered = append(delivered, result.Samples...)
}
if len(delivered) != len(scenario.out) {
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
continue
}
expectedElements := list.New()
for _, j := range scenario.out {
expectedElements.PushBack(j)
}
for j := 0; j < len(delivered); j++ {
actual := delivered[j]
found := false
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
candidate := element.Value.(*model.Sample)
if candidate.Value != actual.Value {
continue
}
if len(candidate.Metric) != len(actual.Metric) {
continue
}
labelsMatch := false
for key, value := range candidate.Metric {
actualValue, ok := actual.Metric[key]
if !ok {
break
}
if actualValue == value {
labelsMatch = true
break
}
}
if !labelsMatch {
continue
}
// XXX: Test time.
found = true
expectedElements.Remove(element)
}
if !found {
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
}
}
scenario.test(t, i)
}
}

View File

@ -37,7 +37,7 @@ type counter002 struct {
type processor002 struct{}
func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
func (p *processor002) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error {
// Processor for telemetry schema version 0.0.2.
// container for telemetry data
var entities []struct {
@ -60,15 +60,15 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
var values []counter002
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
out <- &Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
err := fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
}
for _, counter := range values {
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(counter.Labels))
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
labels := labelSet(entity.BaseLabels).Merge(labelSet(counter.Labels))
pendingSamples = append(pendingSamples, &model.Sample{
Metric: model.Metric(labels),
@ -81,17 +81,17 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
var values []histogram002
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
out <- &Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
err := fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
}
for _, histogram := range values {
for percentile, value := range histogram.Values {
entityLabels := labelSet(entity.BaseLabels).Merge(labelSet(histogram.Labels))
entityLabels[model.LabelName("percentile")] = model.LabelValue(percentile)
labels := mergeTargetLabels(entityLabels, o.BaseLabels)
labels := labelSet(entity.BaseLabels).Merge(labelSet(histogram.Labels))
labels[model.LabelName("percentile")] = model.LabelValue(percentile)
pendingSamples = append(pendingSamples, &model.Sample{
Metric: model.Metric(labels),
@ -102,14 +102,15 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
}
default:
out <- &Result{
Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type),
err := fmt.Errorf("Unknown metric type %q", entity.Metric.Type)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
}
}
if len(pendingSamples) > 0 {
out <- &Result{Samples: pendingSamples}
return out.Ingest(&Result{Samples: pendingSamples})
}
return nil

View File

@ -14,11 +14,11 @@
package extraction
import (
"container/list"
"fmt"
"os"
"path"
"runtime"
"sort"
"testing"
"time"
@ -26,186 +26,157 @@ import (
"github.com/prometheus/client_golang/test"
)
var test002Time = time.Now()
type testProcessor002ProcessScenario struct {
in string
expected, actual []*Result
err error
}
func (s *testProcessor002ProcessScenario) Ingest(r *Result) error {
s.actual = append(s.actual, r)
return nil
}
func (s *testProcessor002ProcessScenario) test(t test.Tester, set int) {
reader, err := os.Open(path.Join("fixtures", s.in))
if err != nil {
t.Fatalf("%d. couldn't open scenario input file %s: %s", set, s.in, err)
}
options := &ProcessOptions{
Timestamp: test002Time,
}
if err := Processor002.ProcessSingle(reader, s, options); !test.ErrorEqual(s.err, err) {
t.Fatalf("%d. expected err of %s, got %s", set, s.err, err)
}
if len(s.actual) != len(s.expected) {
t.Fatalf("%d. expected output length of %d, got %d", set, len(s.expected), len(s.actual))
}
for i, expected := range s.expected {
sort.Sort(s.actual[i].Samples)
sort.Sort(expected.Samples)
if !expected.equal(s.actual[i]) {
t.Fatalf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
}
}
}
func testProcessor002Process(t test.Tester) {
var scenarios = []struct {
in string
baseLabels model.LabelSet
out model.Samples
err error
}{
var scenarios = []testProcessor002ProcessScenario{
{
in: "empty.json",
err: fmt.Errorf("EOF"),
},
{
in: "test0_0_1-0_0_2.json",
baseLabels: model.LabelSet{
model.JobLabel: "batch_exporter",
},
out: model.Samples{
&model.Sample{
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&model.Sample{
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&model.Sample{
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
Value: 25,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&model.Sample{
expected: []*Result{
{
Samples: model.Samples{
&model.Sample{
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.0459814091918713,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 78.48563317257356,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 15.890724674774395,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.0459814091918713,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 78.48563317257356,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.0459814091918713,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 78.48563317257356,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 15.890724674774395,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.6120456642749681,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 15.890724674774395,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 0.6120456642749681,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 97.31798360385088,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 84.63044031436561,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.355915069887731,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.89202084295582,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 160.21100853053224,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
Value: 1.772733213161236,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
Value: 109.99626121011262,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
Value: 172.49828748957728,
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 97.31798360385088,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 84.63044031436561,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 1.355915069887731,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 109.89202084295582,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 160.21100853053224,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 1.772733213161236,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 109.99626121011262,
Timestamp: test002Time,
},
&model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 172.49828748957728,
Timestamp: test002Time,
},
},
},
},
},
}
for i, scenario := range scenarios {
inputChannel := make(chan *Result, 1024)
defer close(inputChannel)
reader, err := os.Open(path.Join("fixtures", scenario.in))
if err != nil {
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
}
options := &ProcessOptions{
Timestamp: time.Now(),
BaseLabels: scenario.baseLabels,
}
err = Processor002.ProcessSingle(reader, inputChannel, options)
if !test.ErrorEqual(scenario.err, err) {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
continue
}
delivered := model.Samples{}
for len(inputChannel) != 0 {
result := <-inputChannel
if result.Err != nil {
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
}
delivered = append(delivered, result.Samples...)
}
if len(delivered) != len(scenario.out) {
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
continue
}
expectedElements := list.New()
for _, j := range scenario.out {
expectedElements.PushBack(j)
}
for j := 0; j < len(delivered); j++ {
actual := delivered[j]
found := false
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
candidate := element.Value.(*model.Sample)
if candidate.Value != actual.Value {
continue
}
if len(candidate.Metric) != len(actual.Metric) {
continue
}
labelsMatch := false
for key, value := range candidate.Metric {
actualValue, ok := actual.Metric[key]
if !ok {
break
}
if actualValue == value {
labelsMatch = true
break
}
}
if !labelsMatch {
continue
}
// XXX: Test time.
found = true
expectedElements.Remove(element)
}
if !found {
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
}
}
scenario.test(t, i)
}
}

View File

@ -24,6 +24,10 @@ type Sample struct {
}
func (s *Sample) Equal(o *Sample) bool {
if s == o {
return true
}
if !s.Metric.Equal(o.Metric) {
return false
}