Remove extraction result type, simplify code.

This commit is contained in:
Julius Volz 2014-12-31 13:53:17 +01:00
parent 376c7c732b
commit 0bb9a56250
9 changed files with 283 additions and 343 deletions

View File

@ -101,7 +101,7 @@ func extractCounter(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
sample.Value = model.SampleValue(m.Counter.GetValue()) sample.Value = model.SampleValue(m.Counter.GetValue())
} }
return out.Ingest(&Result{Samples: samples}) return out.Ingest(samples)
} }
func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
@ -132,7 +132,7 @@ func extractGauge(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
sample.Value = model.SampleValue(m.Gauge.GetValue()) sample.Value = model.SampleValue(m.Gauge.GetValue())
} }
return out.Ingest(&Result{Samples: samples}) return out.Ingest(samples)
} }
func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
@ -194,7 +194,7 @@ func extractSummary(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
} }
} }
return out.Ingest(&Result{Samples: samples}) return out.Ingest(samples)
} }
func extractUntyped(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error { func extractUntyped(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error {
@ -225,5 +225,5 @@ func extractUntyped(out Ingester, o *ProcessOptions, f *dto.MetricFamily) error
sample.Value = model.SampleValue(m.Untyped.GetValue()) sample.Value = model.SampleValue(m.Untyped.GetValue())
} }
return out.Ingest(&Result{Samples: samples}) return out.Ingest(samples)
} }

View File

@ -25,11 +25,11 @@ var testTime = model.Now()
type metricFamilyProcessorScenario struct { type metricFamilyProcessorScenario struct {
in string in string
expected, actual []*Result expected, actual []model.Samples
} }
func (s *metricFamilyProcessorScenario) Ingest(r *Result) error { func (s *metricFamilyProcessorScenario) Ingest(samples model.Samples) error {
s.actual = append(s.actual, r) s.actual = append(s.actual, samples)
return nil return nil
} }
@ -50,10 +50,10 @@ func (s *metricFamilyProcessorScenario) test(t *testing.T, set int) {
} }
for i, expected := range s.expected { for i, expected := range s.expected {
sort.Sort(s.actual[i].Samples) sort.Sort(s.actual[i])
sort.Sort(expected.Samples) sort.Sort(expected)
if !expected.equal(s.actual[i]) { if !expected.Equal(s.actual[i]) {
t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i]) t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
} }
} }
@ -66,9 +66,8 @@ func TestMetricFamilyProcessor(t *testing.T) {
}, },
{ {
in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@", in: "\x8f\x01\n\rrequest_count\x12\x12Number of requests\x18\x00\"0\n#\n\x0fsome_label_name\x12\x10some_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00E\xc0\"6\n)\n\x12another_label_name\x12\x13another_label_value\x1a\t\t\x00\x00\x00\x00\x00\x00U@",
expected: []*Result{ expected: []model.Samples{
{ model.Samples{
Samples: model.Samples{
&model.Sample{ &model.Sample{
Metric: model.Metric{model.MetricNameLabel: "request_count", "some_label_name": "some_label_value"}, Metric: model.Metric{model.MetricNameLabel: "request_count", "some_label_name": "some_label_value"},
Value: -42, Value: -42,
@ -82,12 +81,10 @@ func TestMetricFamilyProcessor(t *testing.T) {
}, },
}, },
}, },
},
{ {
in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@", in: "\xb9\x01\n\rrequest_count\x12\x12Number of requests\x18\x02\"O\n#\n\x0fsome_label_name\x12\x10some_label_value\"(\x1a\x12\t\xaeG\xe1z\x14\xae\xef?\x11\x00\x00\x00\x00\x00\x00E\xc0\x1a\x12\t+\x87\x16\xd9\xce\xf7\xef?\x11\x00\x00\x00\x00\x00\x00U\xc0\"A\n)\n\x12another_label_name\x12\x13another_label_value\"\x14\x1a\x12\t\x00\x00\x00\x00\x00\x00\xe0?\x11\x00\x00\x00\x00\x00\x00$@",
expected: []*Result{ expected: []model.Samples{
{ model.Samples{
Samples: model.Samples{
&model.Sample{ &model.Sample{
Metric: model.Metric{model.MetricNameLabel: "request_count", "some_label_name": "some_label_value", "quantile": "0.99"}, Metric: model.Metric{model.MetricNameLabel: "request_count", "some_label_name": "some_label_value", "quantile": "0.99"},
Value: -42, Value: -42,
@ -106,7 +103,6 @@ func TestMetricFamilyProcessor(t *testing.T) {
}, },
}, },
}, },
},
} }
for i, scenario := range scenarios { for i, scenario := range scenarios {

View File

@ -30,7 +30,7 @@ type ProcessOptions struct {
// Ingester consumes result streams in whatever way is desired by the user. // Ingester consumes result streams in whatever way is desired by the user.
type Ingester interface { type Ingester interface {
Ingest(*Result) error Ingest(model.Samples) error
} }
// Processor is responsible for decoding the actual message responses from // Processor is responsible for decoding the actual message responses from
@ -56,42 +56,6 @@ func labelSet(labels map[string]string) model.LabelSet {
return labelset return labelset
} }
// Result encapsulates the outcome from processing samples from a source.
type Result struct {
Err error
Samples model.Samples
}
func (r *Result) equal(o *Result) bool {
if r == o {
return true
}
if r.Err != o.Err {
if r.Err == nil || o.Err == nil {
return false
}
if r.Err.Error() != o.Err.Error() {
return false
}
}
if len(r.Samples) != len(o.Samples) {
return false
}
for i, mine := range r.Samples {
other := o.Samples[i]
if !mine.Equal(other) {
return false
}
}
return true
}
// A basic interface only useful in testing contexts for dispensing the time // A basic interface only useful in testing contexts for dispensing the time
// in a controlled manner. // in a controlled manner.
type instantProvider interface { type instantProvider interface {

View File

@ -77,11 +77,7 @@ func (p *processor001) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptio
case gauge001, counter001: case gauge001, counter001:
sampleValue, ok := value.Value.(float64) sampleValue, ok := value.Value.(float64)
if !ok { if !ok {
err = fmt.Errorf("could not convert value from %s %s to float64", entity, value) return fmt.Errorf("could not convert value from %s %s to float64", entity, value)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
} }
pendingSamples = append(pendingSamples, &model.Sample{ pendingSamples = append(pendingSamples, &model.Sample{
@ -95,21 +91,13 @@ func (p *processor001) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptio
case histogram001: case histogram001:
sampleValue, ok := value.Value.(map[string]interface{}) sampleValue, ok := value.Value.(map[string]interface{})
if !ok { if !ok {
err = fmt.Errorf("could not convert value from %q to a map[string]interface{}", value.Value) return fmt.Errorf("could not convert value from %q to a map[string]interface{}", value.Value)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
} }
for percentile, percentileValue := range sampleValue { for percentile, percentileValue := range sampleValue {
individualValue, ok := percentileValue.(float64) individualValue, ok := percentileValue.(float64)
if !ok { if !ok {
err = fmt.Errorf("could not convert value from %q to a float64", percentileValue) return fmt.Errorf("could not convert value from %q to a float64", percentileValue)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
} }
childMetric := make(map[model.LabelName]model.LabelValue, len(labels)+1) childMetric := make(map[model.LabelName]model.LabelValue, len(labels)+1)
@ -132,7 +120,7 @@ func (p *processor001) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptio
} }
} }
if len(pendingSamples) > 0 { if len(pendingSamples) > 0 {
return out.Ingest(&Result{Samples: pendingSamples}) return out.Ingest(pendingSamples)
} }
return nil return nil

View File

@ -29,12 +29,12 @@ var test001Time = model.Now()
type testProcessor001ProcessScenario struct { type testProcessor001ProcessScenario struct {
in string in string
expected, actual []*Result expected, actual []model.Samples
err error err error
} }
func (s *testProcessor001ProcessScenario) Ingest(r *Result) error { func (s *testProcessor001ProcessScenario) Ingest(samples model.Samples) error {
s.actual = append(s.actual, r) s.actual = append(s.actual, samples)
return nil return nil
} }
@ -56,10 +56,10 @@ func (s *testProcessor001ProcessScenario) test(t testing.TB, set int) {
} }
for i, expected := range s.expected { for i, expected := range s.expected {
sort.Sort(s.actual[i].Samples) sort.Sort(s.actual[i])
sort.Sort(expected.Samples) sort.Sort(expected)
if !expected.equal(s.actual[i]) { if !expected.Equal(s.actual[i]) {
t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i]) t.Errorf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
} }
} }
@ -73,9 +73,8 @@ func testProcessor001Process(t testing.TB) {
}, },
{ {
in: "test0_0_1-0_0_2.json", in: "test0_0_1-0_0_2.json",
expected: []*Result{ expected: []model.Samples{
{ model.Samples{
Samples: model.Samples{
&model.Sample{ &model.Sample{
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"}, Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25, Value: 25,
@ -107,7 +106,6 @@ func testProcessor001Process(t testing.TB) {
Timestamp: test001Time, Timestamp: test001Time,
}, },
&model.Sample{ &model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.0459814091918713, Value: 0.0459814091918713,
Timestamp: test001Time, Timestamp: test001Time,
@ -128,7 +126,6 @@ func testProcessor001Process(t testing.TB) {
Timestamp: test001Time, Timestamp: test001Time,
}, },
&model.Sample{ &model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 97.31798360385088, Value: 97.31798360385088,
Timestamp: test001Time, Timestamp: test001Time,
@ -171,7 +168,6 @@ func testProcessor001Process(t testing.TB) {
}, },
}, },
}, },
},
} }
for i, scenario := range scenarios { for i, scenario := range scenarios {

View File

@ -60,11 +60,7 @@ func (p *processor002) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptio
var values []counter002 var values []counter002
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
err := fmt.Errorf("could not extract %s value: %s", entity.Metric.Type, err) return fmt.Errorf("could not extract %s value: %s", entity.Metric.Type, err)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
} }
for _, counter := range values { for _, counter := range values {
@ -81,11 +77,7 @@ func (p *processor002) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptio
var values []histogram002 var values []histogram002
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil { if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
err := fmt.Errorf("could not extract %s value: %s", entity.Metric.Type, err) return fmt.Errorf("could not extract %s value: %s", entity.Metric.Type, err)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
continue
} }
for _, histogram := range values { for _, histogram := range values {
@ -102,15 +94,12 @@ func (p *processor002) ProcessSingle(in io.Reader, out Ingester, o *ProcessOptio
} }
default: default:
err := fmt.Errorf("unknown metric type %q", entity.Metric.Type) return fmt.Errorf("unknown metric type %q", entity.Metric.Type)
if err := out.Ingest(&Result{Err: err}); err != nil {
return err
}
} }
} }
if len(pendingSamples) > 0 { if len(pendingSamples) > 0 {
return out.Ingest(&Result{Samples: pendingSamples}) return out.Ingest(pendingSamples)
} }
return nil return nil

View File

@ -30,12 +30,12 @@ var test002Time = model.Now()
type testProcessor002ProcessScenario struct { type testProcessor002ProcessScenario struct {
in string in string
expected, actual []*Result expected, actual []model.Samples
err error err error
} }
func (s *testProcessor002ProcessScenario) Ingest(r *Result) error { func (s *testProcessor002ProcessScenario) Ingest(samples model.Samples) error {
s.actual = append(s.actual, r) s.actual = append(s.actual, samples)
return nil return nil
} }
@ -57,10 +57,10 @@ func (s *testProcessor002ProcessScenario) test(t testing.TB, set int) {
} }
for i, expected := range s.expected { for i, expected := range s.expected {
sort.Sort(s.actual[i].Samples) sort.Sort(s.actual[i])
sort.Sort(expected.Samples) sort.Sort(expected)
if !expected.equal(s.actual[i]) { if !expected.Equal(s.actual[i]) {
t.Fatalf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i]) t.Fatalf("%d.%d. expected %s, got %s", set, i, expected, s.actual[i])
} }
} }
@ -74,9 +74,8 @@ func testProcessor002Process(t testing.TB) {
}, },
{ {
in: "test0_0_1-0_0_2.json", in: "test0_0_1-0_0_2.json",
expected: []*Result{ expected: []model.Samples{
{ model.Samples{
Samples: model.Samples{
&model.Sample{ &model.Sample{
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"}, Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job"},
Value: 25, Value: 25,
@ -129,7 +128,6 @@ func testProcessor002Process(t testing.TB) {
Timestamp: test002Time, Timestamp: test002Time,
}, },
&model.Sample{ &model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 97.31798360385088, Value: 97.31798360385088,
Timestamp: test002Time, Timestamp: test002Time,
@ -172,7 +170,6 @@ func testProcessor002Process(t testing.TB) {
}, },
}, },
}, },
},
} }
for i, scenario := range scenarios { for i, scenario := range scenarios {

View File

@ -32,9 +32,8 @@ mf1{label="value1"} -3.14 123456
mf1{label="value2"} 42 mf1{label="value2"} 42
mf2 4 mf2 4
` `
out = map[model.LabelValue]*Result{ out = map[model.LabelValue]model.Samples{
"mf1": { "mf1": model.Samples{
Samples: model.Samples{
&model.Sample{ &model.Sample{
Metric: model.Metric{model.MetricNameLabel: "mf1", "label": "value1"}, Metric: model.Metric{model.MetricNameLabel: "mf1", "label": "value1"},
Value: -3.14, Value: -3.14,
@ -46,9 +45,7 @@ mf2 4
Timestamp: ts, Timestamp: ts,
}, },
}, },
}, "mf2": model.Samples{
"mf2": {
Samples: model.Samples{
&model.Sample{ &model.Sample{
Metric: model.Metric{model.MetricNameLabel: "mf2"}, Metric: model.Metric{model.MetricNameLabel: "mf2"},
Value: 3, Value: 3,
@ -60,16 +57,15 @@ mf2 4
Timestamp: ts, Timestamp: ts,
}, },
}, },
},
} }
) )
type testIngester struct { type testIngester struct {
results []*Result results []model.Samples
} }
func (i *testIngester) Ingest(r *Result) error { func (i *testIngester) Ingest(s model.Samples) error {
i.results = append(i.results, r) i.results = append(i.results, s)
return nil return nil
} }
@ -88,16 +84,16 @@ func TestTextProcessor(t *testing.T) {
t.Fatalf("Expected length %d, got %d", expected, got) t.Fatalf("Expected length %d, got %d", expected, got)
} }
for _, r := range ingester.results { for _, r := range ingester.results {
expected, ok := out[r.Samples[0].Metric[model.MetricNameLabel]] expected, ok := out[r[0].Metric[model.MetricNameLabel]]
if !ok { if !ok {
t.Fatalf( t.Fatalf(
"Unexpected metric name %q", "Unexpected metric name %q",
r.Samples[0].Metric[model.MetricNameLabel], r[0].Metric[model.MetricNameLabel],
) )
} }
sort.Sort(expected.Samples) sort.Sort(expected)
sort.Sort(r.Samples) sort.Sort(r)
if !expected.equal(r) { if !expected.Equal(r) {
t.Errorf("expected %s, got %s", expected, r) t.Errorf("expected %s, got %s", expected, r)
} }
} }

View File

@ -63,3 +63,17 @@ func (s Samples) Less(i, j int) bool {
func (s Samples) Swap(i, j int) { func (s Samples) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }
// Equal compares two sets of samples and returns true if they are equal.
func (s Samples) Equal(o Samples) bool {
if len(s) != len(o) {
return false
}
for i, sample := range s {
if !sample.Equal(o[i]) {
return false
}
}
return true
}