diff --git a/extraction/processor.go b/extraction/processor.go index e832452..7e82e08 100644 --- a/extraction/processor.go +++ b/extraction/processor.go @@ -32,6 +32,32 @@ type Ingester interface { Ingest(*Result) error } +// MergeLabelsIngester merges a labelset ontop of a given extraction result and +// passes the result on to another ingester. Label collisions are avoided by +// appending a label prefix to any newly merged colliding labels. +type MergeLabelsIngester struct { + Labels model.LabelSet + CollisionPrefix model.LabelName + + Ingester Ingester +} + +func (i *MergeLabelsIngester) Ingest(r *Result) error { + for _, s := range r.Samples { + s.Metric.MergeFromLabelSet(i.Labels, i.CollisionPrefix) + } + + return i.Ingester.Ingest(r) +} + +// ChannelIngester feeds results into a channel without modifying them. +type ChannelIngester chan<- *Result + +func (i ChannelIngester) Ingest(r *Result) error { + i <- r + return nil +} + // Processor is responsible for decoding the actual message responses from // stream into a format that can be consumed with the end result written // to the results channel.