From 65a55bbf4ea9660209820b30c633710f1ded1e71 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Sat, 10 Aug 2013 14:07:39 +0200 Subject: [PATCH] Replace Process consumer channel with Ingester. --- Makefile | 3 +- extraction/metricfamilyprocessor.go | 34 +-- extraction/metricfamilyprocessor_test.go | 51 ++-- extraction/processor.go | 37 ++- extraction/processor0_0_1.go | 16 +- extraction/processor0_0_1_test.go | 302 +++++++++++----------- extraction/processor0_0_2.go | 19 +- extraction/processor0_0_2_test.go | 303 +++++++++++------------ model/sample.go | 4 + 9 files changed, 377 insertions(+), 392 deletions(-) diff --git a/Makefile b/Makefile index 3ba28b1..8ba9db0 100644 --- a/Makefile +++ b/Makefile @@ -59,8 +59,7 @@ dependencies: source_path $(GOCC) $(GO) get github.com/matttproud/gocheck test: build - $(MAKE) -C prometheus test - $(MAKE) -C examples test + $(GO) test ./... advice: test $(MAKE) -C prometheus advice diff --git a/extraction/metricfamilyprocessor.go b/extraction/metricfamilyprocessor.go index 15e5186..c0376d9 100644 --- a/extraction/metricfamilyprocessor.go +++ b/extraction/metricfamilyprocessor.go @@ -33,7 +33,7 @@ type metricFamilyProcessor struct{} // more details. var MetricFamilyProcessor = new(metricFamilyProcessor) -func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o *ProcessOptions) error { +func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, out Ingester, o *ProcessOptions) error { family := new(dto.MetricFamily) for { @@ -49,16 +49,24 @@ func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o * switch *family.Type { case dto.MetricType_COUNTER: - extractCounter(r, o, family) + if err := extractCounter(out, o, family); err != nil { + return err + } case dto.MetricType_GAUGE: - extractGauge(r, o, family) + if err := extractGauge(out, o, family); err != nil { + return err + } case dto.MetricType_SUMMARY: - extractSummary(r, o, family) + if err := extractSummary(out, o, family); err != nil { + return err + } } } + + return nil } -func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { +func extractCounter(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { samples := make(model.Samples, 0, len(f.Metric)) for _, m := range f.Metric { @@ -85,12 +93,10 @@ func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { sample.Value = model.SampleValue(m.Counter.GetValue()) } - r <- &Result{ - Samples: samples, - } + return out.Ingest(&Result{Samples: samples}) } -func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { +func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { samples := make(model.Samples, 0, len(f.Metric)) for _, m := range f.Metric { @@ -117,12 +123,10 @@ func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { sample.Value = model.SampleValue(m.Gauge.GetValue()) } - r <- &Result{ - Samples: samples, - } + return out.Ingest(&Result{Samples: samples}) } -func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { +func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { // BUG(matt): Lack of dumping of sum or count. samples := make(model.Samples, 0, len(f.Metric)) @@ -154,7 +158,5 @@ func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) { } } - r <- &Result{ - Samples: samples, - } + return out.Ingest(&Result{Samples: samples}) } diff --git a/extraction/metricfamilyprocessor_test.go b/extraction/metricfamilyprocessor_test.go index df23f4e..5e3b38d 100644 --- a/extraction/metricfamilyprocessor_test.go +++ b/extraction/metricfamilyprocessor_test.go @@ -14,6 +14,7 @@ package extraction import ( + "sort" "strings" "testing" "time" @@ -24,54 +25,38 @@ import ( var testTime = time.Now() type metricFamilyProcessorScenario struct { - in string - out []*Result + in string + expected, actual []*Result +} + +func (s *metricFamilyProcessorScenario) Ingest(r *Result) error { + s.actual = append(s.actual, r) + return nil } func (s *metricFamilyProcessorScenario) test(t *testing.T, set int) { i := strings.NewReader(s.in) - chanSize := 1 - if len(s.out) > 0 { - chanSize = len(s.out) * 3 - } - r := make(chan *Result, chanSize) o := &ProcessOptions{ Timestamp: testTime, BaseLabels: model.LabelSet{"base": "label"}, } - err := MetricFamilyProcessor.ProcessSingle(i, r, o) + err := MetricFamilyProcessor.ProcessSingle(i, s, o) if err != nil { t.Fatalf("%d. got error: %s", set, err) } - close(r) - actual := []*Result{} - - for e := range r { - actual = append(actual, e) + if len(s.expected) != len(s.actual) { + t.Fatalf("%d. expected length %d, got %d", set, len(s.expected), len(s.actual)) } - if len(actual) != len(s.out) { - t.Fatalf("%d. expected length %d, got %d", set, len(s.out), len(actual)) - } + for i, expected := range s.expected { + sort.Sort(s.actual[i].Samples) + sort.Sort(expected.Samples) - for i, expected := range s.out { - if expected.Err != actual[i].Err { - t.Fatalf("%d. expected err of %s, got %s", set, expected.Err, actual[i].Err) - } - - if len(expected.Samples) != len(actual[i].Samples) { - t.Fatalf("%d.%d expected %d samples, got %d", set, i, len(expected.Samples), len(actual[i].Samples)) - } - - for j := 0; j < len(expected.Samples); j++ { - e := expected.Samples[j] - a := actual[i].Samples[j] - if !a.Equal(e) { - t.Fatalf("%d.%d.%d expected %s sample, got %s", set, i, j, e, a) - } + if !expected.equal(s.actual[i]) { + t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i]) } } } @@ -83,7 +68,7 @@ func TestMetricFamilyProcessor(t *testing.T) { }, { in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@", - out: []*Result{ + expected: []*Result{ { Samples: model.Samples{ &model.Sample{ @@ -102,7 +87,7 @@ func TestMetricFamilyProcessor(t *testing.T) { }, { in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@", - out: []*Result{ + expected: []*Result{ { Samples: model.Samples{ &model.Sample{ diff --git a/extraction/processor.go b/extraction/processor.go index da4b6d9..68c0861 100644 --- a/extraction/processor.go +++ b/extraction/processor.go @@ -30,13 +30,18 @@ type ProcessOptions struct { BaseLabels model.LabelSet } +// Ingester consumes result streams in whatever way is desired by the user. +type Ingester interface { + Ingest(*Result) error +} + // Processor is responsible for decoding the actual message responses from // stream into a format that can be consumed with the end result written // to the results channel. type Processor interface { // ProcessSingle treats the input as a single self-contained message body and // transforms it accordingly. It has no support for streaming. - ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error + ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error } // Helper function to convert map[string]string into LabelSet. @@ -84,6 +89,36 @@ type Result struct { Samples model.Samples } +func (r *Result) equal(o *Result) bool { + if r == o { + return true + } + + if r.Err != o.Err { + if r.Err == nil || o.Err == nil { + return false + } + + if r.Err.Error() != o.Err.Error() { + return false + } + } + + if len(r.Samples) != len(o.Samples) { + return false + } + + for i, mine := range r.Samples { + other := o.Samples[i] + + if !mine.Equal(other) { + return false + } + } + + return true +} + // A basic interface only useful in testing contexts for dispensing the time // in a controlled manner. type instantProvider interface { diff --git a/extraction/processor0_0_1.go b/extraction/processor0_0_1.go index 178ae61..62d3076 100644 --- a/extraction/processor0_0_1.go +++ b/extraction/processor0_0_1.go @@ -55,7 +55,7 @@ type entity001 []struct { } `json:"metric"` } -func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error { +func (p *processor001) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error { // TODO(matt): Replace with plain-jane JSON unmarshalling. buffer, err := ioutil.ReadAll(in) if err != nil { @@ -79,7 +79,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces sampleValue, ok := value.Value.(float64) if !ok { err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value) - out <- &Result{Err: err} + if err := out.Ingest(&Result{Err: err}); err != nil { + return err + } continue } @@ -95,7 +97,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces sampleValue, ok := value.Value.(map[string]interface{}) if !ok { err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value) - out <- &Result{Err: err} + if err := out.Ingest(&Result{Err: err}); err != nil { + return err + } continue } @@ -103,7 +107,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces individualValue, ok := percentileValue.(float64) if !ok { err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue) - out <- &Result{Err: err} + if err := out.Ingest(&Result{Err: err}); err != nil { + return err + } continue } @@ -127,7 +133,7 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces } } if len(pendingSamples) > 0 { - out <- &Result{Samples: pendingSamples} + return out.Ingest(&Result{Samples: pendingSamples}) } return nil diff --git a/extraction/processor0_0_1_test.go b/extraction/processor0_0_1_test.go index 1593a87..ca248be 100644 --- a/extraction/processor0_0_1_test.go +++ b/extraction/processor0_0_1_test.go @@ -14,10 +14,10 @@ package extraction import ( - "container/list" "fmt" "os" "path" + "sort" "testing" "time" @@ -25,13 +25,50 @@ import ( "github.com/prometheus/client_golang/test" ) +var test001Time = time.Now() + +type testProcessor001ProcessScenario struct { + in string + baseLabels model.LabelSet + expected, actual []*Result + err error +} + +func (s *testProcessor001ProcessScenario) Ingest(r *Result) error { + s.actual = append(s.actual, r) + return nil +} + +func (s *testProcessor001ProcessScenario) test(t test.Tester, set int) { + reader, err := os.Open(path.Join("fixtures", s.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", set, s.in, err) + } + + options := &ProcessOptions{ + Timestamp: test001Time, + BaseLabels: s.baseLabels, + } + if err := Processor001.ProcessSingle(reader, s, options); !test.ErrorEqual(s.err, err) { + t.Fatalf("%d. expected err of %s, got %s", set, s.err, err) + } + + if len(s.actual) != len(s.expected) { + t.Fatalf("%d. expected output length of %d, got %d", set, len(s.expected), len(s.actual)) + } + + for i, expected := range s.expected { + sort.Sort(s.actual[i].Samples) + sort.Sort(expected.Samples) + + if !expected.equal(s.actual[i]) { + t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i]) + } + } +} + func testProcessor001Process(t test.Tester) { - var scenarios = []struct { - in string - baseLabels model.LabelSet - out model.Samples - err error - }{ + var scenarios = []testProcessor001ProcessScenario{ { in: "empty.json", err: fmt.Errorf("unexpected end of JSON input"), @@ -41,170 +78,109 @@ func testProcessor001Process(t test.Tester) { baseLabels: model.LabelSet{ model.JobLabel: "batch_exporter", }, - out: model.Samples{ - &model.Sample{ - Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, - Value: 25, - }, - &model.Sample{ - Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, - Value: 25, - }, - &model.Sample{ - Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, - Value: 25, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 0.0459814091918713, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 78.48563317257356, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 15.890724674774395, - }, - &model.Sample{ + expected: []*Result{ + { + Samples: model.Samples{ + &model.Sample{ + Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + Timestamp: test001Time, + }, + &model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 0.0459814091918713, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 78.48563317257356, - }, - &model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.6120456642749681, + Timestamp: test001Time, + }, + &model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 15.890724674774395, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 0.6120456642749681, - }, - &model.Sample{ - - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 97.31798360385088, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 84.63044031436561, - }, - &model.Sample{ - - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 1.355915069887731, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 109.89202084295582, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 160.21100853053224, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 1.772733213161236, - }, - &model.Sample{ - - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 109.99626121011262, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 172.49828748957728, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 97.31798360385088, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 84.63044031436561, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.355915069887731, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.89202084295582, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 160.21100853053224, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.772733213161236, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.99626121011262, + Timestamp: test001Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 172.49828748957728, + Timestamp: test001Time, + }, + }, }, }, }, } for i, scenario := range scenarios { - inputChannel := make(chan *Result, 1024) - - defer close(inputChannel) - - reader, err := os.Open(path.Join("fixtures", scenario.in)) - if err != nil { - t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err) - } - - options := &ProcessOptions{ - Timestamp: time.Now(), - BaseLabels: scenario.baseLabels, - } - err = Processor001.ProcessSingle(reader, inputChannel, options) - if !test.ErrorEqual(scenario.err, err) { - t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) - continue - } - - delivered := model.Samples{} - - for len(inputChannel) != 0 { - result := <-inputChannel - if result.Err != nil { - t.Fatalf("%d. expected no error, got: %s", i, result.Err) - } - delivered = append(delivered, result.Samples...) - } - - if len(delivered) != len(scenario.out) { - t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered)) - - continue - } - - expectedElements := list.New() - for _, j := range scenario.out { - expectedElements.PushBack(j) - } - - for j := 0; j < len(delivered); j++ { - actual := delivered[j] - - found := false - for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { - candidate := element.Value.(*model.Sample) - - if candidate.Value != actual.Value { - continue - } - - if len(candidate.Metric) != len(actual.Metric) { - continue - } - - labelsMatch := false - - for key, value := range candidate.Metric { - actualValue, ok := actual.Metric[key] - if !ok { - break - } - if actualValue == value { - labelsMatch = true - break - } - } - - if !labelsMatch { - continue - } - - // XXX: Test time. - found = true - expectedElements.Remove(element) - } - - if !found { - t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) - } - } + scenario.test(t, i) } } diff --git a/extraction/processor0_0_2.go b/extraction/processor0_0_2.go index 2c7b566..2a278d9 100644 --- a/extraction/processor0_0_2.go +++ b/extraction/processor0_0_2.go @@ -37,7 +37,7 @@ type counter002 struct { type processor002 struct{} -func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error { +func (p *processor002) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error { // Processor for telemetry schema version 0.0.2. // container for telemetry data var entities []struct { @@ -60,8 +60,9 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces var values []counter002 if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { - out <- &Result{ - Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err), + err := fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err) + if err := out.Ingest(&Result{Err: err}); err != nil { + return err } continue } @@ -81,8 +82,9 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces var values []histogram002 if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { - out <- &Result{ - Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err), + err := fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err) + if err := out.Ingest(&Result{Err: err}); err != nil { + return err } continue } @@ -102,14 +104,15 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces } default: - out <- &Result{ - Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type), + err := fmt.Errorf("Unknown metric type %q", entity.Metric.Type) + if err := out.Ingest(&Result{Err: err}); err != nil { + return err } } } if len(pendingSamples) > 0 { - out <- &Result{Samples: pendingSamples} + return out.Ingest(&Result{Samples: pendingSamples}) } return nil diff --git a/extraction/processor0_0_2_test.go b/extraction/processor0_0_2_test.go index 5d20751..8b5d77e 100644 --- a/extraction/processor0_0_2_test.go +++ b/extraction/processor0_0_2_test.go @@ -14,11 +14,11 @@ package extraction import ( - "container/list" "fmt" "os" "path" "runtime" + "sort" "testing" "time" @@ -26,13 +26,50 @@ import ( "github.com/prometheus/client_golang/test" ) +var test002Time = time.Now() + +type testProcessor002ProcessScenario struct { + in string + baseLabels model.LabelSet + expected, actual []*Result + err error +} + +func (s *testProcessor002ProcessScenario) Ingest(r *Result) error { + s.actual = append(s.actual, r) + return nil +} + +func (s *testProcessor002ProcessScenario) test(t test.Tester, set int) { + reader, err := os.Open(path.Join("fixtures", s.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", set, s.in, err) + } + + options := &ProcessOptions{ + Timestamp: test002Time, + BaseLabels: s.baseLabels, + } + if err := Processor002.ProcessSingle(reader, s, options); !test.ErrorEqual(s.err, err) { + t.Fatalf("%d. expected err of %s, got %s", set, s.err, err) + } + + if len(s.actual) != len(s.expected) { + t.Fatalf("%d. expected output length of %d, got %d", set, len(s.expected), len(s.actual)) + } + + for i, expected := range s.expected { + sort.Sort(s.actual[i].Samples) + sort.Sort(expected.Samples) + + if !expected.equal(s.actual[i]) { + t.Fatalf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i]) + } + } +} + func testProcessor002Process(t test.Tester) { - var scenarios = []struct { - in string - baseLabels model.LabelSet - out model.Samples - err error - }{ + var scenarios = []testProcessor002ProcessScenario{ { in: "empty.json", err: fmt.Errorf("EOF"), @@ -42,170 +79,108 @@ func testProcessor002Process(t test.Tester) { baseLabels: model.LabelSet{ model.JobLabel: "batch_exporter", }, - out: model.Samples{ - &model.Sample{ - Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, - Value: 25, - }, - &model.Sample{ - Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, - Value: 25, - }, - &model.Sample{ - Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, - Value: 25, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 0.0459814091918713, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 78.48563317257356, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 15.890724674774395, - }, - &model.Sample{ + expected: []*Result{ + { + Samples: model.Samples{ + &model.Sample{ + Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, + Value: 25, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.0459814091918713, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 78.48563317257356, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 15.890724674774395, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 0.6120456642749681, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 97.31798360385088, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 84.63044031436561, + Timestamp: test002Time, + }, + &model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 0.0459814091918713, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 78.48563317257356, - }, - &model.Sample{ - - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 15.890724674774395, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 0.6120456642749681, - }, - &model.Sample{ - - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 97.31798360385088, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 84.63044031436561, - }, - &model.Sample{ - - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 1.355915069887731, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 109.89202084295582, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 160.21100853053224, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, - Value: 1.772733213161236, - }, - &model.Sample{ - - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, - Value: 109.99626121011262, - }, - &model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, - Value: 172.49828748957728, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.355915069887731, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.89202084295582, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 160.21100853053224, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, + Value: 1.772733213161236, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, + Value: 109.99626121011262, + Timestamp: test002Time, + }, + &model.Sample{ + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, + Value: 172.49828748957728, + Timestamp: test002Time, + }, + }, }, }, }, } for i, scenario := range scenarios { - inputChannel := make(chan *Result, 1024) - - defer close(inputChannel) - - reader, err := os.Open(path.Join("fixtures", scenario.in)) - if err != nil { - t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err) - } - - options := &ProcessOptions{ - Timestamp: time.Now(), - BaseLabels: scenario.baseLabels, - } - err = Processor002.ProcessSingle(reader, inputChannel, options) - if !test.ErrorEqual(scenario.err, err) { - t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) - continue - } - - delivered := model.Samples{} - - for len(inputChannel) != 0 { - result := <-inputChannel - if result.Err != nil { - t.Fatalf("%d. expected no error, got: %s", i, result.Err) - } - delivered = append(delivered, result.Samples...) - } - - if len(delivered) != len(scenario.out) { - t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered)) - - continue - } - - expectedElements := list.New() - for _, j := range scenario.out { - expectedElements.PushBack(j) - } - - for j := 0; j < len(delivered); j++ { - actual := delivered[j] - - found := false - for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { - candidate := element.Value.(*model.Sample) - - if candidate.Value != actual.Value { - continue - } - - if len(candidate.Metric) != len(actual.Metric) { - continue - } - - labelsMatch := false - - for key, value := range candidate.Metric { - actualValue, ok := actual.Metric[key] - if !ok { - break - } - if actualValue == value { - labelsMatch = true - break - } - } - - if !labelsMatch { - continue - } - - // XXX: Test time. - found = true - expectedElements.Remove(element) - } - - if !found { - t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual) - } - } + scenario.test(t, i) } } diff --git a/model/sample.go b/model/sample.go index 845d5b2..8f5db32 100644 --- a/model/sample.go +++ b/model/sample.go @@ -24,6 +24,10 @@ type Sample struct { } func (s *Sample) Equal(o *Sample) bool { + if s == o { + return true + } + if !s.Metric.Equal(o.Metric) { return false }