Replace Process consumer channel with Ingester.
This commit is contained in:
parent
b92fd1caaa
commit
65a55bbf4e
3
Makefile
3
Makefile
|
@ -59,8 +59,7 @@ dependencies: source_path $(GOCC)
|
||||||
$(GO) get github.com/matttproud/gocheck
|
$(GO) get github.com/matttproud/gocheck
|
||||||
|
|
||||||
test: build
|
test: build
|
||||||
$(MAKE) -C prometheus test
|
$(GO) test ./...
|
||||||
$(MAKE) -C examples test
|
|
||||||
|
|
||||||
advice: test
|
advice: test
|
||||||
$(MAKE) -C prometheus advice
|
$(MAKE) -C prometheus advice
|
||||||
|
|
|
@ -33,7 +33,7 @@ type metricFamilyProcessor struct{}
|
||||||
// more details.
|
// more details.
|
||||||
var MetricFamilyProcessor = new(metricFamilyProcessor)
|
var MetricFamilyProcessor = new(metricFamilyProcessor)
|
||||||
|
|
||||||
func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o *ProcessOptions) error {
|
func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, out Ingester, o *ProcessOptions) error {
|
||||||
family := new(dto.MetricFamily)
|
family := new(dto.MetricFamily)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -49,16 +49,24 @@ func (m *metricFamilyProcessor) ProcessSingle(i io.Reader, r chan<- *Result, o *
|
||||||
|
|
||||||
switch *family.Type {
|
switch *family.Type {
|
||||||
case dto.MetricType_COUNTER:
|
case dto.MetricType_COUNTER:
|
||||||
extractCounter(r, o, family)
|
if err := extractCounter(out, o, family); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
case dto.MetricType_GAUGE:
|
case dto.MetricType_GAUGE:
|
||||||
extractGauge(r, o, family)
|
if err := extractGauge(out, o, family); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
case dto.MetricType_SUMMARY:
|
case dto.MetricType_SUMMARY:
|
||||||
extractSummary(r, o, family)
|
if err := extractSummary(out, o, family); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
|
func extractCounter(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
|
||||||
samples := make(model.Samples, 0, len(f.Metric))
|
samples := make(model.Samples, 0, len(f.Metric))
|
||||||
|
|
||||||
for _, m := range f.Metric {
|
for _, m := range f.Metric {
|
||||||
|
@ -85,12 +93,10 @@ func extractCounter(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
|
||||||
sample.Value = model.SampleValue(m.Counter.GetValue())
|
sample.Value = model.SampleValue(m.Counter.GetValue())
|
||||||
}
|
}
|
||||||
|
|
||||||
r <- &Result{
|
return out.Ingest(&Result{Samples: samples})
|
||||||
Samples: samples,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
|
func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
|
||||||
samples := make(model.Samples, 0, len(f.Metric))
|
samples := make(model.Samples, 0, len(f.Metric))
|
||||||
|
|
||||||
for _, m := range f.Metric {
|
for _, m := range f.Metric {
|
||||||
|
@ -117,12 +123,10 @@ func extractGauge(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
|
||||||
sample.Value = model.SampleValue(m.Gauge.GetValue())
|
sample.Value = model.SampleValue(m.Gauge.GetValue())
|
||||||
}
|
}
|
||||||
|
|
||||||
r <- &Result{
|
return out.Ingest(&Result{Samples: samples})
|
||||||
Samples: samples,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
|
func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
|
||||||
// BUG(matt): Lack of dumping of sum or count.
|
// BUG(matt): Lack of dumping of sum or count.
|
||||||
samples := make(model.Samples, 0, len(f.Metric))
|
samples := make(model.Samples, 0, len(f.Metric))
|
||||||
|
|
||||||
|
@ -154,7 +158,5 @@ func extractSummary(r chan<- *Result, o *ProcessOptions, f *dto.MetricFamily) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r <- &Result{
|
return out.Ingest(&Result{Samples: samples})
|
||||||
Samples: samples,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package extraction
|
package extraction
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -25,53 +26,37 @@ var testTime = time.Now()
|
||||||
|
|
||||||
type metricFamilyProcessorScenario struct {
|
type metricFamilyProcessorScenario struct {
|
||||||
in string
|
in string
|
||||||
out []*Result
|
expected, actual []*Result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *metricFamilyProcessorScenario) Ingest(r *Result) error {
|
||||||
|
s.actual = append(s.actual, r)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *metricFamilyProcessorScenario) test(t *testing.T, set int) {
|
func (s *metricFamilyProcessorScenario) test(t *testing.T, set int) {
|
||||||
i := strings.NewReader(s.in)
|
i := strings.NewReader(s.in)
|
||||||
chanSize := 1
|
|
||||||
if len(s.out) > 0 {
|
|
||||||
chanSize = len(s.out) * 3
|
|
||||||
}
|
|
||||||
r := make(chan *Result, chanSize)
|
|
||||||
|
|
||||||
o := &ProcessOptions{
|
o := &ProcessOptions{
|
||||||
Timestamp: testTime,
|
Timestamp: testTime,
|
||||||
BaseLabels: model.LabelSet{"base": "label"},
|
BaseLabels: model.LabelSet{"base": "label"},
|
||||||
}
|
}
|
||||||
|
|
||||||
err := MetricFamilyProcessor.ProcessSingle(i, r, o)
|
err := MetricFamilyProcessor.ProcessSingle(i, s, o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%d. got error: %s", set, err)
|
t.Fatalf("%d. got error: %s", set, err)
|
||||||
}
|
}
|
||||||
close(r)
|
|
||||||
|
|
||||||
actual := []*Result{}
|
if len(s.expected) != len(s.actual) {
|
||||||
|
t.Fatalf("%d. expected length %d, got %d", set, len(s.expected), len(s.actual))
|
||||||
for e := range r {
|
|
||||||
actual = append(actual, e)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(actual) != len(s.out) {
|
for i, expected := range s.expected {
|
||||||
t.Fatalf("%d. expected length %d, got %d", set, len(s.out), len(actual))
|
sort.Sort(s.actual[i].Samples)
|
||||||
}
|
sort.Sort(expected.Samples)
|
||||||
|
|
||||||
for i, expected := range s.out {
|
if !expected.equal(s.actual[i]) {
|
||||||
if expected.Err != actual[i].Err {
|
t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
|
||||||
t.Fatalf("%d. expected err of %s, got %s", set, expected.Err, actual[i].Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(expected.Samples) != len(actual[i].Samples) {
|
|
||||||
t.Fatalf("%d.%d expected %d samples, got %d", set, i, len(expected.Samples), len(actual[i].Samples))
|
|
||||||
}
|
|
||||||
|
|
||||||
for j := 0; j < len(expected.Samples); j++ {
|
|
||||||
e := expected.Samples[j]
|
|
||||||
a := actual[i].Samples[j]
|
|
||||||
if !a.Equal(e) {
|
|
||||||
t.Fatalf("%d.%d.%d expected %s sample, got %s", set, i, j, e, a)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -83,7 +68,7 @@ func TestMetricFamilyProcessor(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@",
|
in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@",
|
||||||
out: []*Result{
|
expected: []*Result{
|
||||||
{
|
{
|
||||||
Samples: model.Samples{
|
Samples: model.Samples{
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
@ -102,7 +87,7 @@ func TestMetricFamilyProcessor(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@",
|
in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@",
|
||||||
out: []*Result{
|
expected: []*Result{
|
||||||
{
|
{
|
||||||
Samples: model.Samples{
|
Samples: model.Samples{
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
|
@ -30,13 +30,18 @@ type ProcessOptions struct {
|
||||||
BaseLabels model.LabelSet
|
BaseLabels model.LabelSet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ingester consumes result streams in whatever way is desired by the user.
|
||||||
|
type Ingester interface {
|
||||||
|
Ingest(*Result) error
|
||||||
|
}
|
||||||
|
|
||||||
// Processor is responsible for decoding the actual message responses from
|
// Processor is responsible for decoding the actual message responses from
|
||||||
// stream into a format that can be consumed with the end result written
|
// stream into a format that can be consumed with the end result written
|
||||||
// to the results channel.
|
// to the results channel.
|
||||||
type Processor interface {
|
type Processor interface {
|
||||||
// ProcessSingle treats the input as a single self-contained message body and
|
// ProcessSingle treats the input as a single self-contained message body and
|
||||||
// transforms it accordingly. It has no support for streaming.
|
// transforms it accordingly. It has no support for streaming.
|
||||||
ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error
|
ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function to convert map[string]string into LabelSet.
|
// Helper function to convert map[string]string into LabelSet.
|
||||||
|
@ -84,6 +89,36 @@ type Result struct {
|
||||||
Samples model.Samples
|
Samples model.Samples
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Result) equal(o *Result) bool {
|
||||||
|
if r == o {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.Err != o.Err {
|
||||||
|
if r.Err == nil || o.Err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.Err.Error() != o.Err.Error() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(r.Samples) != len(o.Samples) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, mine := range r.Samples {
|
||||||
|
other := o.Samples[i]
|
||||||
|
|
||||||
|
if !mine.Equal(other) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// A basic interface only useful in testing contexts for dispensing the time
|
// A basic interface only useful in testing contexts for dispensing the time
|
||||||
// in a controlled manner.
|
// in a controlled manner.
|
||||||
type instantProvider interface {
|
type instantProvider interface {
|
||||||
|
|
|
@ -55,7 +55,7 @@ type entity001 []struct {
|
||||||
} `json:"metric"`
|
} `json:"metric"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
|
func (p *processor001) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error {
|
||||||
// TODO(matt): Replace with plain-jane JSON unmarshalling.
|
// TODO(matt): Replace with plain-jane JSON unmarshalling.
|
||||||
buffer, err := ioutil.ReadAll(in)
|
buffer, err := ioutil.ReadAll(in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -79,7 +79,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
|
||||||
sampleValue, ok := value.Value.(float64)
|
sampleValue, ok := value.Value.(float64)
|
||||||
if !ok {
|
if !ok {
|
||||||
err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value)
|
err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value)
|
||||||
out <- &Result{Err: err}
|
if err := out.Ingest(&Result{Err: err}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +97,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
|
||||||
sampleValue, ok := value.Value.(map[string]interface{})
|
sampleValue, ok := value.Value.(map[string]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value)
|
err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value)
|
||||||
out <- &Result{Err: err}
|
if err := out.Ingest(&Result{Err: err}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +107,9 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
|
||||||
individualValue, ok := percentileValue.(float64)
|
individualValue, ok := percentileValue.(float64)
|
||||||
if !ok {
|
if !ok {
|
||||||
err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue)
|
err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue)
|
||||||
out <- &Result{Err: err}
|
if err := out.Ingest(&Result{Err: err}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +133,7 @@ func (p *processor001) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(pendingSamples) > 0 {
|
if len(pendingSamples) > 0 {
|
||||||
out <- &Result{Samples: pendingSamples}
|
return out.Ingest(&Result{Samples: pendingSamples})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -14,10 +14,10 @@
|
||||||
package extraction
|
package extraction
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -25,13 +25,50 @@ import (
|
||||||
"github.com/prometheus/client_golang/test"
|
"github.com/prometheus/client_golang/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testProcessor001Process(t test.Tester) {
|
var test001Time = time.Now()
|
||||||
var scenarios = []struct {
|
|
||||||
|
type testProcessor001ProcessScenario struct {
|
||||||
in string
|
in string
|
||||||
baseLabels model.LabelSet
|
baseLabels model.LabelSet
|
||||||
out model.Samples
|
expected, actual []*Result
|
||||||
err error
|
err error
|
||||||
}{
|
}
|
||||||
|
|
||||||
|
func (s *testProcessor001ProcessScenario) Ingest(r *Result) error {
|
||||||
|
s.actual = append(s.actual, r)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testProcessor001ProcessScenario) test(t test.Tester, set int) {
|
||||||
|
reader, err := os.Open(path.Join("fixtures", s.in))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%d. couldn't open scenario input file %s: %s", set, s.in, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
options := &ProcessOptions{
|
||||||
|
Timestamp: test001Time,
|
||||||
|
BaseLabels: s.baseLabels,
|
||||||
|
}
|
||||||
|
if err := Processor001.ProcessSingle(reader, s, options); !test.ErrorEqual(s.err, err) {
|
||||||
|
t.Fatalf("%d. expected err of %s, got %s", set, s.err, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.actual) != len(s.expected) {
|
||||||
|
t.Fatalf("%d. expected output length of %d, got %d", set, len(s.expected), len(s.actual))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, expected := range s.expected {
|
||||||
|
sort.Sort(s.actual[i].Samples)
|
||||||
|
sort.Sort(expected.Samples)
|
||||||
|
|
||||||
|
if !expected.equal(s.actual[i]) {
|
||||||
|
t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testProcessor001Process(t test.Tester) {
|
||||||
|
var scenarios = []testProcessor001ProcessScenario{
|
||||||
{
|
{
|
||||||
in: "empty.json",
|
in: "empty.json",
|
||||||
err: fmt.Errorf("unexpected end of JSON input"),
|
err: fmt.Errorf("unexpected end of JSON input"),
|
||||||
|
@ -41,170 +78,109 @@ func testProcessor001Process(t test.Tester) {
|
||||||
baseLabels: model.LabelSet{
|
baseLabels: model.LabelSet{
|
||||||
model.JobLabel: "batch_exporter",
|
model.JobLabel: "batch_exporter",
|
||||||
},
|
},
|
||||||
out: model.Samples{
|
expected: []*Result{
|
||||||
|
{
|
||||||
|
Samples: model.Samples{
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||||
Value: 25,
|
Value: 25,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||||
Value: 25,
|
Value: 25,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||||
Value: 25,
|
Value: 25,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 0.0459814091918713,
|
Value: 0.0459814091918713,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 78.48563317257356,
|
Value: 78.48563317257356,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 15.890724674774395,
|
Value: 15.890724674774395,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 0.0459814091918713,
|
Value: 0.0459814091918713,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 78.48563317257356,
|
Value: 78.48563317257356,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 15.890724674774395,
|
Value: 15.890724674774395,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 0.6120456642749681,
|
Value: 0.6120456642749681,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 97.31798360385088,
|
Value: 97.31798360385088,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 84.63044031436561,
|
Value: 84.63044031436561,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 1.355915069887731,
|
Value: 1.355915069887731,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 109.89202084295582,
|
Value: 109.89202084295582,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 160.21100853053224,
|
Value: 160.21100853053224,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 1.772733213161236,
|
Value: 1.772733213161236,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 109.99626121011262,
|
Value: 109.99626121011262,
|
||||||
|
Timestamp: test001Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 172.49828748957728,
|
Value: 172.49828748957728,
|
||||||
|
Timestamp: test001Time,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, scenario := range scenarios {
|
for i, scenario := range scenarios {
|
||||||
inputChannel := make(chan *Result, 1024)
|
scenario.test(t, i)
|
||||||
|
|
||||||
defer close(inputChannel)
|
|
||||||
|
|
||||||
reader, err := os.Open(path.Join("fixtures", scenario.in))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
options := &ProcessOptions{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
BaseLabels: scenario.baseLabels,
|
|
||||||
}
|
|
||||||
err = Processor001.ProcessSingle(reader, inputChannel, options)
|
|
||||||
if !test.ErrorEqual(scenario.err, err) {
|
|
||||||
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
delivered := model.Samples{}
|
|
||||||
|
|
||||||
for len(inputChannel) != 0 {
|
|
||||||
result := <-inputChannel
|
|
||||||
if result.Err != nil {
|
|
||||||
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
|
|
||||||
}
|
|
||||||
delivered = append(delivered, result.Samples...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(delivered) != len(scenario.out) {
|
|
||||||
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedElements := list.New()
|
|
||||||
for _, j := range scenario.out {
|
|
||||||
expectedElements.PushBack(j)
|
|
||||||
}
|
|
||||||
|
|
||||||
for j := 0; j < len(delivered); j++ {
|
|
||||||
actual := delivered[j]
|
|
||||||
|
|
||||||
found := false
|
|
||||||
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
|
|
||||||
candidate := element.Value.(*model.Sample)
|
|
||||||
|
|
||||||
if candidate.Value != actual.Value {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(candidate.Metric) != len(actual.Metric) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
labelsMatch := false
|
|
||||||
|
|
||||||
for key, value := range candidate.Metric {
|
|
||||||
actualValue, ok := actual.Metric[key]
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if actualValue == value {
|
|
||||||
labelsMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !labelsMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX: Test time.
|
|
||||||
found = true
|
|
||||||
expectedElements.Remove(element)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !found {
|
|
||||||
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ type counter002 struct {
|
||||||
|
|
||||||
type processor002 struct{}
|
type processor002 struct{}
|
||||||
|
|
||||||
func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *ProcessOptions) error {
|
func (p *processor002) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptions) error {
|
||||||
// Processor for telemetry schema version 0.0.2.
|
// Processor for telemetry schema version 0.0.2.
|
||||||
// container for telemetry data
|
// container for telemetry data
|
||||||
var entities []struct {
|
var entities []struct {
|
||||||
|
@ -60,8 +60,9 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
|
||||||
var values []counter002
|
var values []counter002
|
||||||
|
|
||||||
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
|
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
|
||||||
out <- &Result{
|
err := fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err)
|
||||||
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
|
if err := out.Ingest(&Result{Err: err}); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -81,8 +82,9 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
|
||||||
var values []histogram002
|
var values []histogram002
|
||||||
|
|
||||||
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
|
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
|
||||||
out <- &Result{
|
err := fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err)
|
||||||
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
|
if err := out.Ingest(&Result{Err: err}); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -102,14 +104,15 @@ func (p *processor002) ProcessSingle(in io.Reader, out chan<- *Result, o *Proces
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
out <- &Result{
|
err := fmt.Errorf("Unknown metric type %q", entity.Metric.Type)
|
||||||
Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type),
|
if err := out.Ingest(&Result{Err: err}); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(pendingSamples) > 0 {
|
if len(pendingSamples) > 0 {
|
||||||
out <- &Result{Samples: pendingSamples}
|
return out.Ingest(&Result{Samples: pendingSamples})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -14,11 +14,11 @@
|
||||||
package extraction
|
package extraction
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -26,13 +26,50 @@ import (
|
||||||
"github.com/prometheus/client_golang/test"
|
"github.com/prometheus/client_golang/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testProcessor002Process(t test.Tester) {
|
var test002Time = time.Now()
|
||||||
var scenarios = []struct {
|
|
||||||
|
type testProcessor002ProcessScenario struct {
|
||||||
in string
|
in string
|
||||||
baseLabels model.LabelSet
|
baseLabels model.LabelSet
|
||||||
out model.Samples
|
expected, actual []*Result
|
||||||
err error
|
err error
|
||||||
}{
|
}
|
||||||
|
|
||||||
|
func (s *testProcessor002ProcessScenario) Ingest(r *Result) error {
|
||||||
|
s.actual = append(s.actual, r)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testProcessor002ProcessScenario) test(t test.Tester, set int) {
|
||||||
|
reader, err := os.Open(path.Join("fixtures", s.in))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%d. couldn't open scenario input file %s: %s", set, s.in, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
options := &ProcessOptions{
|
||||||
|
Timestamp: test002Time,
|
||||||
|
BaseLabels: s.baseLabels,
|
||||||
|
}
|
||||||
|
if err := Processor002.ProcessSingle(reader, s, options); !test.ErrorEqual(s.err, err) {
|
||||||
|
t.Fatalf("%d. expected err of %s, got %s", set, s.err, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.actual) != len(s.expected) {
|
||||||
|
t.Fatalf("%d. expected output length of %d, got %d", set, len(s.expected), len(s.actual))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, expected := range s.expected {
|
||||||
|
sort.Sort(s.actual[i].Samples)
|
||||||
|
sort.Sort(expected.Samples)
|
||||||
|
|
||||||
|
if !expected.equal(s.actual[i]) {
|
||||||
|
t.Fatalf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testProcessor002Process(t test.Tester) {
|
||||||
|
var scenarios = []testProcessor002ProcessScenario{
|
||||||
{
|
{
|
||||||
in: "empty.json",
|
in: "empty.json",
|
||||||
err: fmt.Errorf("EOF"),
|
err: fmt.Errorf("EOF"),
|
||||||
|
@ -42,170 +79,108 @@ func testProcessor002Process(t test.Tester) {
|
||||||
baseLabels: model.LabelSet{
|
baseLabels: model.LabelSet{
|
||||||
model.JobLabel: "batch_exporter",
|
model.JobLabel: "batch_exporter",
|
||||||
},
|
},
|
||||||
out: model.Samples{
|
expected: []*Result{
|
||||||
|
{
|
||||||
|
Samples: model.Samples{
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||||
Value: 25,
|
Value: 25,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||||
Value: 25,
|
Value: 25,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"},
|
||||||
Value: 25,
|
Value: 25,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 0.0459814091918713,
|
Value: 0.0459814091918713,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 78.48563317257356,
|
Value: 78.48563317257356,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 15.890724674774395,
|
Value: 15.890724674774395,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 0.0459814091918713,
|
Value: 0.0459814091918713,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 78.48563317257356,
|
Value: 78.48563317257356,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 15.890724674774395,
|
Value: 15.890724674774395,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 0.6120456642749681,
|
Value: 0.6120456642749681,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 97.31798360385088,
|
Value: 97.31798360385088,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 84.63044031436561,
|
Value: 84.63044031436561,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 1.355915069887731,
|
Value: 1.355915069887731,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 109.89202084295582,
|
Value: 109.89202084295582,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 160.21100853053224,
|
Value: 160.21100853053224,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"},
|
||||||
Value: 1.772733213161236,
|
Value: 1.772733213161236,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
|
|
||||||
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"},
|
||||||
Value: 109.99626121011262,
|
Value: 109.99626121011262,
|
||||||
|
Timestamp: test002Time,
|
||||||
},
|
},
|
||||||
&model.Sample{
|
&model.Sample{
|
||||||
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"},
|
||||||
Value: 172.49828748957728,
|
Value: 172.49828748957728,
|
||||||
|
Timestamp: test002Time,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, scenario := range scenarios {
|
for i, scenario := range scenarios {
|
||||||
inputChannel := make(chan *Result, 1024)
|
scenario.test(t, i)
|
||||||
|
|
||||||
defer close(inputChannel)
|
|
||||||
|
|
||||||
reader, err := os.Open(path.Join("fixtures", scenario.in))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%d. couldn't open scenario input file %s: %s", i, scenario.in, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
options := &ProcessOptions{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
BaseLabels: scenario.baseLabels,
|
|
||||||
}
|
|
||||||
err = Processor002.ProcessSingle(reader, inputChannel, options)
|
|
||||||
if !test.ErrorEqual(scenario.err, err) {
|
|
||||||
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
delivered := model.Samples{}
|
|
||||||
|
|
||||||
for len(inputChannel) != 0 {
|
|
||||||
result := <-inputChannel
|
|
||||||
if result.Err != nil {
|
|
||||||
t.Fatalf("%d. expected no error, got: %s", i, result.Err)
|
|
||||||
}
|
|
||||||
delivered = append(delivered, result.Samples...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(delivered) != len(scenario.out) {
|
|
||||||
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedElements := list.New()
|
|
||||||
for _, j := range scenario.out {
|
|
||||||
expectedElements.PushBack(j)
|
|
||||||
}
|
|
||||||
|
|
||||||
for j := 0; j < len(delivered); j++ {
|
|
||||||
actual := delivered[j]
|
|
||||||
|
|
||||||
found := false
|
|
||||||
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
|
|
||||||
candidate := element.Value.(*model.Sample)
|
|
||||||
|
|
||||||
if candidate.Value != actual.Value {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(candidate.Metric) != len(actual.Metric) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
labelsMatch := false
|
|
||||||
|
|
||||||
for key, value := range candidate.Metric {
|
|
||||||
actualValue, ok := actual.Metric[key]
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if actualValue == value {
|
|
||||||
labelsMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !labelsMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX: Test time.
|
|
||||||
found = true
|
|
||||||
expectedElements.Remove(element)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !found {
|
|
||||||
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,10 @@ type Sample struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sample) Equal(o *Sample) bool {
|
func (s *Sample) Equal(o *Sample) bool {
|
||||||
|
if s == o {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
if !s.Metric.Equal(o.Metric) {
|
if !s.Metric.Equal(o.Metric) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue