Merge pull request #624 from prometheus/beorn/push

Support new base64 encoding for pushing to the Pushgateway
This commit is contained in:
Björn Rabenstein 2019-07-22 21:57:04 +02:00 committed by GitHub
commit bb9b00a86e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 43 deletions

View File

@ -36,6 +36,7 @@ package push
import ( import (
"bytes" "bytes"
"encoding/base64"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -48,7 +49,12 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
const contentTypeHeader = "Content-Type" const (
contentTypeHeader = "Content-Type"
// base64Suffix is appended to a label name in the request URL path to
// mark the following label value as base64 encoded.
base64Suffix = "@base64"
)
// HTTPDoer is an interface for the one method of http.Client that is used by Pusher // HTTPDoer is an interface for the one method of http.Client that is used by Pusher
type HTTPDoer interface { type HTTPDoer interface {
@ -77,9 +83,6 @@ type Pusher struct {
// name. You can use just host:port or ip:port as url, in which case “http://” // name. You can use just host:port or ip:port as url, in which case “http://”
// is added automatically. Alternatively, include the schema in the // is added automatically. Alternatively, include the schema in the
// URL. However, do not include the “/metrics/jobs/…” part. // URL. However, do not include the “/metrics/jobs/…” part.
//
// Note that until https://github.com/prometheus/pushgateway/issues/97 is
// resolved, a “/” character in the job name is prohibited.
func New(url, job string) *Pusher { func New(url, job string) *Pusher {
var ( var (
reg = prometheus.NewRegistry() reg = prometheus.NewRegistry()
@ -91,9 +94,6 @@ func New(url, job string) *Pusher {
if strings.HasSuffix(url, "/") { if strings.HasSuffix(url, "/") {
url = url[:len(url)-1] url = url[:len(url)-1]
} }
if strings.Contains(job, "/") {
err = fmt.Errorf("job contains '/': %s", job)
}
return &Pusher{ return &Pusher{
error: err, error: err,
@ -155,19 +155,12 @@ func (p *Pusher) Collector(c prometheus.Collector) *Pusher {
// will lead to an error. // will lead to an error.
// //
// For convenience, this method returns a pointer to the Pusher itself. // For convenience, this method returns a pointer to the Pusher itself.
//
// Note that until https://github.com/prometheus/pushgateway/issues/97 is
// resolved, this method does not allow a “/” character in the label value.
func (p *Pusher) Grouping(name, value string) *Pusher { func (p *Pusher) Grouping(name, value string) *Pusher {
if p.error == nil { if p.error == nil {
if !model.LabelName(name).IsValid() { if !model.LabelName(name).IsValid() {
p.error = fmt.Errorf("grouping label has invalid name: %s", name) p.error = fmt.Errorf("grouping label has invalid name: %s", name)
return p return p
} }
if strings.Contains(value, "/") {
p.error = fmt.Errorf("value of grouping label %s contains '/': %s", name, value)
return p
}
p.grouping[name] = value p.grouping[name] = value
} }
return p return p
@ -215,13 +208,7 @@ func (p *Pusher) Delete() error {
if p.error != nil { if p.error != nil {
return p.error return p.error
} }
urlComponents := []string{url.QueryEscape(p.job)} req, err := http.NewRequest(http.MethodDelete, p.fullURL(), nil)
for ln, lv := range p.grouping {
urlComponents = append(urlComponents, ln, lv)
}
deleteURL := fmt.Sprintf("%s/metrics/job/%s", p.url, strings.Join(urlComponents, "/"))
req, err := http.NewRequest(http.MethodDelete, deleteURL, nil)
if err != nil { if err != nil {
return err return err
} }
@ -235,7 +222,7 @@ func (p *Pusher) Delete() error {
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 202 { if resp.StatusCode != 202 {
body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only. body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only.
return fmt.Errorf("unexpected status code %d while deleting %s: %s", resp.StatusCode, deleteURL, body) return fmt.Errorf("unexpected status code %d while deleting %s: %s", resp.StatusCode, p.fullURL(), body)
} }
return nil return nil
} }
@ -244,12 +231,6 @@ func (p *Pusher) push(method string) error {
if p.error != nil { if p.error != nil {
return p.error return p.error
} }
urlComponents := []string{url.QueryEscape(p.job)}
for ln, lv := range p.grouping {
urlComponents = append(urlComponents, ln, lv)
}
pushURL := fmt.Sprintf("%s/metrics/job/%s", p.url, strings.Join(urlComponents, "/"))
mfs, err := p.gatherers.Gather() mfs, err := p.gatherers.Gather()
if err != nil { if err != nil {
return err return err
@ -273,7 +254,7 @@ func (p *Pusher) push(method string) error {
} }
enc.Encode(mf) enc.Encode(mf)
} }
req, err := http.NewRequest(method, pushURL, buf) req, err := http.NewRequest(method, p.fullURL(), buf)
if err != nil { if err != nil {
return err return err
} }
@ -288,7 +269,40 @@ func (p *Pusher) push(method string) error {
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 202 { if resp.StatusCode != 202 {
body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only. body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only.
return fmt.Errorf("unexpected status code %d while pushing to %s: %s", resp.StatusCode, pushURL, body) return fmt.Errorf("unexpected status code %d while pushing to %s: %s", resp.StatusCode, p.fullURL(), body)
} }
return nil return nil
} }
// fullURL assembles the URL used to push/delete metrics and returns it as a
// string. The job name and any grouping label values containing a '/' will
// trigger a base64 encoding of the affected component and proper suffixing of
// the preceding component. If the component does not contain a '/' but other
// special character, the usual url.QueryEscape is used for compatibility with
// older versions of the Pushgateway and for better readability.
func (p *Pusher) fullURL() string {
urlComponents := []string{}
if encodedJob, base64 := encodeComponent(p.job); base64 {
urlComponents = append(urlComponents, "job"+base64Suffix, encodedJob)
} else {
urlComponents = append(urlComponents, "job", encodedJob)
}
for ln, lv := range p.grouping {
if encodedLV, base64 := encodeComponent(lv); base64 {
urlComponents = append(urlComponents, ln+base64Suffix, encodedLV)
} else {
urlComponents = append(urlComponents, ln, encodedLV)
}
}
return fmt.Sprintf("%s/metrics/%s", p.url, strings.Join(urlComponents, "/"))
}
// encodeComponent encodes the provided string with base64.RawURLEncoding in
// case it contains '/'. If not, it uses url.QueryEscape instead. It returns
// true in the former case.
func encodeComponent(s string) (string, bool) {
if strings.Contains(s, "/") {
return base64.RawURLEncoding.EncodeToString([]byte(s)), true
}
return url.QueryEscape(s), false
}

View File

@ -120,6 +120,57 @@ func TestPush(t *testing.T) {
t.Error("unexpected path:", lastPath) t.Error("unexpected path:", lastPath)
} }
// Pushes that require base64 encoding.
if err := New(pgwOK.URL, "test/job").
Collector(metric1).
Collector(metric2).
Push(); err != nil {
t.Fatal(err)
}
if lastMethod != http.MethodPut {
t.Errorf("got method %q for Push, want %q", lastMethod, http.MethodPut)
}
if !bytes.Equal(lastBody, wantBody) {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job@base64/dGVzdC9qb2I" {
t.Error("unexpected path:", lastPath)
}
if err := New(pgwOK.URL, "testjob").
Grouping("foobar", "bu/ms").
Collector(metric1).
Collector(metric2).
Push(); err != nil {
t.Fatal(err)
}
if lastMethod != http.MethodPut {
t.Errorf("got method %q for Push, want %q", lastMethod, http.MethodPut)
}
if !bytes.Equal(lastBody, wantBody) {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job/testjob/foobar@base64/YnUvbXM" {
t.Error("unexpected path:", lastPath)
}
// Push that requires URL encoding.
if err := New(pgwOK.URL, "testjob").
Grouping("titan", "Προμηθεύς").
Collector(metric1).
Collector(metric2).
Push(); err != nil {
t.Fatal(err)
}
if lastMethod != http.MethodPut {
t.Errorf("got method %q for Push, want %q", lastMethod, http.MethodPut)
}
if !bytes.Equal(lastBody, wantBody) {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job/testjob/titan/%CE%A0%CF%81%CE%BF%CE%BC%CE%B7%CE%B8%CE%B5%CF%8D%CF%82" {
t.Error("unexpected path:", lastPath)
}
// Push some Collectors with a broken PGW. // Push some Collectors with a broken PGW.
if err := New(pgwErr.URL, "testjob"). if err := New(pgwErr.URL, "testjob").
Collector(metric1). Collector(metric1).
@ -140,19 +191,6 @@ func TestPush(t *testing.T) {
Push(); err == nil { Push(); err == nil {
t.Error("push with grouping contained in metrics succeeded") t.Error("push with grouping contained in metrics succeeded")
} }
if err := New(pgwOK.URL, "test/job").
Collector(metric1).
Collector(metric2).
Push(); err == nil {
t.Error("push with invalid job value succeeded")
}
if err := New(pgwOK.URL, "testjob").
Grouping("foobar", "bu/ms").
Collector(metric1).
Collector(metric2).
Push(); err == nil {
t.Error("push with invalid grouping succeeded")
}
if err := New(pgwOK.URL, "testjob"). if err := New(pgwOK.URL, "testjob").
Grouping("foo-bar", "bums"). Grouping("foo-bar", "bums").
Collector(metric1). Collector(metric1).