Merge pull request #668 from kamaev/master

Send Graphite metrics with tags
This commit is contained in:
Björn Rabenstein 2019-10-25 01:19:15 +02:00 committed by GitHub
commit 333f01cef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 183 additions and 27 deletions

View File

@ -54,6 +54,9 @@ const (
// Config defines the Graphite bridge config.
type Config struct {
// Whether to use Graphite tags or not. Defaults to false.
UseTags bool
// The url to push data to. Required.
URL string
@ -80,6 +83,7 @@ type Config struct {
// Bridge pushes metrics to the configured Graphite server.
type Bridge struct {
useTags bool
url string
prefix string
interval time.Duration
@ -102,6 +106,8 @@ type Logger interface {
func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{}
b.useTags = c.UseTags
if c.URL == "" {
return nil, errors.New("missing URL")
}
@ -178,10 +184,10 @@ func (b *Bridge) Push() error {
}
defer conn.Close()
return writeMetrics(conn, mfs, b.prefix, model.Now())
return writeMetrics(conn, mfs, b.useTags, b.prefix, model.Now())
}
func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model.Time) error {
func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, useTags bool, prefix string, now model.Time) error {
vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{
Timestamp: now,
}, mfs...)
@ -199,7 +205,7 @@ func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model
if err := buf.WriteByte('.'); err != nil {
return err
}
if err := writeMetric(buf, s.Metric); err != nil {
if err := writeMetric(buf, s.Metric, useTags); err != nil {
return err
}
if _, err := fmt.Fprintf(buf, " %g %d\n", s.Value, int64(s.Timestamp)/millisecondsPerSecond); err != nil {
@ -213,20 +219,13 @@ func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model
return nil
}
func writeMetric(buf *bufio.Writer, m model.Metric) error {
func writeMetric(buf *bufio.Writer, m model.Metric, useTags bool) error {
metricName, hasName := m[model.MetricNameLabel]
numLabels := len(m) - 1
if !hasName {
numLabels = len(m)
}
labelStrings := make([]string, 0, numLabels)
for label, value := range m {
if label != model.MetricNameLabel {
labelStrings = append(labelStrings, fmt.Sprintf("%s %s", string(label), string(value)))
}
}
var err error
switch numLabels {
case 0:
@ -234,19 +233,51 @@ func writeMetric(buf *bufio.Writer, m model.Metric) error {
return writeSanitized(buf, string(metricName))
}
default:
sort.Strings(labelStrings)
if err = writeSanitized(buf, string(metricName)); err != nil {
return err
}
if useTags {
return writeTags(buf, m)
} else {
return writeLabels(buf, m, numLabels)
}
}
return nil
}
func writeTags(buf *bufio.Writer, m model.Metric) error {
for label, value := range m {
if label != model.MetricNameLabel {
buf.WriteRune(';')
if _, err := buf.WriteString(string(label)); err != nil {
return err
}
buf.WriteRune('=')
if _, err := buf.WriteString(string(value)); err != nil {
return err
}
}
}
return nil
}
func writeLabels(buf *bufio.Writer, m model.Metric, numLabels int) error {
labelStrings := make([]string, 0, numLabels)
for label, value := range m {
if label != model.MetricNameLabel {
labelString := string(label) + " " + string(value)
labelStrings = append(labelStrings, labelString)
}
}
sort.Strings(labelStrings)
for _, s := range labelStrings {
if err = buf.WriteByte('.'); err != nil {
if err := buf.WriteByte('.'); err != nil {
return err
}
if err = writeSanitized(buf, s); err != nil {
if err := writeSanitized(buf, s); err != nil {
return err
}
}
}
return nil
}

View File

@ -22,7 +22,10 @@ import (
"log"
"net"
"os"
"reflect"
"regexp"
"sort"
"strings"
"testing"
"time"
@ -62,6 +65,11 @@ func TestSanitize(t *testing.T) {
}
func TestWriteSummary(t *testing.T) {
testWriteSummary(t, false)
testWriteSummary(t, true)
}
func testWriteSummary(t *testing.T, useTags bool) {
sumVec := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "name",
@ -95,7 +103,8 @@ func TestWriteSummary(t *testing.T) {
{prefix: "pre.fix"},
}
const want = `%s.name.constname.constvalue.labelname.val1.quantile.0_5 20 1477043
var (
want = `%s.name.constname.constvalue.labelname.val1.quantile.0_5 20 1477043
%s.name.constname.constvalue.labelname.val1.quantile.0_9 30 1477043
%s.name.constname.constvalue.labelname.val1.quantile.0_99 30 1477043
%s.name_sum.constname.constvalue.labelname.val1 60 1477043
@ -106,11 +115,28 @@ func TestWriteSummary(t *testing.T) {
%s.name_sum.constname.constvalue.labelname.val2 90 1477043
%s.name_count.constname.constvalue.labelname.val2 3 1477043
`
wantTagged = `%s.name;constname=constvalue;labelname=val1;quantile=0.5 20 1477043
%s.name;constname=constvalue;labelname=val1;quantile=0.9 30 1477043
%s.name;constname=constvalue;labelname=val1;quantile=0.99 30 1477043
%s.name_sum;constname=constvalue;labelname=val1 60 1477043
%s.name_count;constname=constvalue;labelname=val1 3 1477043
%s.name;constname=constvalue;labelname=val2;quantile=0.5 30 1477043
%s.name;constname=constvalue;labelname=val2;quantile=0.9 40 1477043
%s.name;constname=constvalue;labelname=val2;quantile=0.99 40 1477043
%s.name_sum;constname=constvalue;labelname=val2 90 1477043
%s.name_count;constname=constvalue;labelname=val2 3 1477043
`
)
if useTags {
want = wantTagged
}
for i, tc := range testCases {
now := model.Time(1477043083)
var buf bytes.Buffer
err = writeMetrics(&buf, mfs, tc.prefix, now)
err = writeMetrics(&buf, mfs, useTags, tc.prefix, now)
if err != nil {
t.Fatalf("error: %v", err)
}
@ -119,13 +145,21 @@ func TestWriteSummary(t *testing.T) {
tc.prefix, tc.prefix, tc.prefix, tc.prefix, tc.prefix,
tc.prefix, tc.prefix, tc.prefix, tc.prefix, tc.prefix,
)
if got := buf.String(); wantWithPrefix != got {
t.Fatalf("test case index %d: wanted \n%s\n, got \n%s\n", i, wantWithPrefix, got)
got := buf.String()
if err := checkLinesAreEqual(wantWithPrefix, got, useTags); err != nil {
t.Fatalf("test case index %d:\n%s", i, err.Error())
}
}
}
func TestWriteHistogram(t *testing.T) {
testWriteHistogram(t, false)
testWriteHistogram(t, true)
}
func testWriteHistogram(t *testing.T, useTags bool) {
histVec := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "name",
@ -153,12 +187,13 @@ func TestWriteHistogram(t *testing.T) {
now := model.Time(1477043083)
var buf bytes.Buffer
err = writeMetrics(&buf, mfs, "prefix", now)
err = writeMetrics(&buf, mfs, useTags, "prefix", now)
if err != nil {
t.Fatalf("error: %v", err)
}
want := `prefix.name_bucket.constname.constvalue.labelname.val1.le.0_01 0 1477043
var (
want = `prefix.name_bucket.constname.constvalue.labelname.val1.le.0_01 0 1477043
prefix.name_bucket.constname.constvalue.labelname.val1.le.0_02 0 1477043
prefix.name_bucket.constname.constvalue.labelname.val1.le.0_05 0 1477043
prefix.name_bucket.constname.constvalue.labelname.val1.le.0_1 0 1477043
@ -173,12 +208,40 @@ prefix.name_sum.constname.constvalue.labelname.val2 90 1477043
prefix.name_count.constname.constvalue.labelname.val2 3 1477043
prefix.name_bucket.constname.constvalue.labelname.val2.le._Inf 3 1477043
`
if got := buf.String(); want != got {
t.Fatalf("wanted \n%s\n, got \n%s\n", want, got)
wantTagged = `prefix.name_bucket;constname=constvalue;labelname=val1;le=0.01 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=0.02 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=0.05 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=0.1 0 1477043
prefix.name_sum;constname=constvalue;labelname=val1 60 1477043
prefix.name_count;constname=constvalue;labelname=val1 3 1477043
prefix.name_bucket;constname=constvalue;labelname=val1;le=+Inf 3 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.01 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.02 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.05 0 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=0.1 0 1477043
prefix.name_sum;constname=constvalue;labelname=val2 90 1477043
prefix.name_count;constname=constvalue;labelname=val2 3 1477043
prefix.name_bucket;constname=constvalue;labelname=val2;le=+Inf 3 1477043
`
)
if useTags {
want = wantTagged
}
got := buf.String()
if err := checkLinesAreEqual(want, got, useTags); err != nil {
t.Fatalf(err.Error())
}
}
func TestToReader(t *testing.T) {
testToReader(t, false)
testToReader(t, true)
}
func testToReader(t *testing.T, useTags bool) {
cntVec := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "name",
@ -193,9 +256,19 @@ func TestToReader(t *testing.T) {
reg := prometheus.NewRegistry()
reg.MustRegister(cntVec)
want := `prefix.name.constname.constvalue.labelname.val1 1 1477043
var (
want = `prefix.name.constname.constvalue.labelname.val1 1 1477043
prefix.name.constname.constvalue.labelname.val2 1 1477043
`
wantTagged = `prefix.name;constname=constvalue;labelname=val1 1 1477043
prefix.name;constname=constvalue;labelname=val2 1 1477043
`
)
if useTags {
want = wantTagged
}
mfs, err := reg.Gather()
if err != nil {
t.Fatalf("error: %v", err)
@ -203,16 +276,68 @@ prefix.name.constname.constvalue.labelname.val2 1 1477043
now := model.Time(1477043083)
var buf bytes.Buffer
err = writeMetrics(&buf, mfs, "prefix", now)
err = writeMetrics(&buf, mfs, useTags, "prefix", now)
if err != nil {
t.Fatalf("error: %v", err)
}
if got := buf.String(); want != got {
t.Fatalf("wanted \n%s\n, got \n%s\n", want, got)
got := buf.String()
if err := checkLinesAreEqual(want, got, useTags); err != nil {
t.Fatalf(err.Error())
}
}
func checkLinesAreEqual(w, g string, useTags bool) error {
if useTags {
taggedLineRegexp := regexp.MustCompile(`;| `)
wantLines, err := stringToLines(w)
if err != nil {
return err
}
gotLines, err := stringToLines(g)
if err != nil {
return err
}
for lineInd := range gotLines {
var log string
// Tagged metric, order of tags doesn't matter
// m1 := "prefix.name;tag1=val1;tag2=val2 3 1477043"
// m2 := "prefix.name;tag2=val2;tag1=val1 3 1477043"
// m1 should be equal to m2
wantSplit := taggedLineRegexp.Split(wantLines[lineInd], -1)
gotSplit := taggedLineRegexp.Split(gotLines[lineInd], -1)
sort.Strings(wantSplit)
sort.Strings(gotSplit)
log += fmt.Sprintf("want: %v\ngot: %v\n\n", wantSplit, gotSplit)
if !reflect.DeepEqual(wantSplit, gotSplit) {
return fmt.Errorf(log)
}
}
return nil
}
if w != g {
return fmt.Errorf("wanted:\n\n%s\ngot:\n\n%s", w, g)
}
return nil
}
func stringToLines(s string) (lines []string, err error) {
scanner := bufio.NewScanner(strings.NewReader(s))
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
err = scanner.Err()
return
}
func TestPush(t *testing.T) {
reg := prometheus.NewRegistry()
cntVec := prometheus.NewCounterVec(