Support new base64 encoding for pushing to the Pushgateway

This should only be released after PGW 0.9 is released.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2019-07-22 20:31:04 +02:00
parent c0d684b8af
commit 4b95c4ab42
2 changed files with 95 additions and 43 deletions

View File

@ -36,6 +36,7 @@ package push
import (
"bytes"
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
@ -48,7 +49,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
const contentTypeHeader = "Content-Type"
const (
contentTypeHeader = "Content-Type"
// base64Suffix is appended to a label name in the request URL path to
// mark the following label value as base64 encoded.
base64Suffix = "@base64"
)
// HTTPDoer is an interface for the one method of http.Client that is used by Pusher
type HTTPDoer interface {
@ -77,9 +83,6 @@ type Pusher struct {
// name. You can use just host:port or ip:port as url, in which case “http://”
// is added automatically. Alternatively, include the schema in the
// URL. However, do not include the “/metrics/jobs/…” part.
//
// Note that until https://github.com/prometheus/pushgateway/issues/97 is
// resolved, a “/” character in the job name is prohibited.
func New(url, job string) *Pusher {
var (
reg = prometheus.NewRegistry()
@ -91,9 +94,6 @@ func New(url, job string) *Pusher {
if strings.HasSuffix(url, "/") {
url = url[:len(url)-1]
}
if strings.Contains(job, "/") {
err = fmt.Errorf("job contains '/': %s", job)
}
return &Pusher{
error: err,
@ -155,19 +155,12 @@ func (p *Pusher) Collector(c prometheus.Collector) *Pusher {
// will lead to an error.
//
// For convenience, this method returns a pointer to the Pusher itself.
//
// Note that until https://github.com/prometheus/pushgateway/issues/97 is
// resolved, this method does not allow a “/” character in the label value.
func (p *Pusher) Grouping(name, value string) *Pusher {
if p.error == nil {
if !model.LabelName(name).IsValid() {
p.error = fmt.Errorf("grouping label has invalid name: %s", name)
return p
}
if strings.Contains(value, "/") {
p.error = fmt.Errorf("value of grouping label %s contains '/': %s", name, value)
return p
}
p.grouping[name] = value
}
return p
@ -215,13 +208,7 @@ func (p *Pusher) Delete() error {
if p.error != nil {
return p.error
}
urlComponents := []string{url.QueryEscape(p.job)}
for ln, lv := range p.grouping {
urlComponents = append(urlComponents, ln, lv)
}
deleteURL := fmt.Sprintf("%s/metrics/job/%s", p.url, strings.Join(urlComponents, "/"))
req, err := http.NewRequest(http.MethodDelete, deleteURL, nil)
req, err := http.NewRequest(http.MethodDelete, p.fullURL(), nil)
if err != nil {
return err
}
@ -235,7 +222,7 @@ func (p *Pusher) Delete() error {
defer resp.Body.Close()
if resp.StatusCode != 202 {
body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only.
return fmt.Errorf("unexpected status code %d while deleting %s: %s", resp.StatusCode, deleteURL, body)
return fmt.Errorf("unexpected status code %d while deleting %s: %s", resp.StatusCode, p.fullURL(), body)
}
return nil
}
@ -244,12 +231,6 @@ func (p *Pusher) push(method string) error {
if p.error != nil {
return p.error
}
urlComponents := []string{url.QueryEscape(p.job)}
for ln, lv := range p.grouping {
urlComponents = append(urlComponents, ln, lv)
}
pushURL := fmt.Sprintf("%s/metrics/job/%s", p.url, strings.Join(urlComponents, "/"))
mfs, err := p.gatherers.Gather()
if err != nil {
return err
@ -273,7 +254,7 @@ func (p *Pusher) push(method string) error {
}
enc.Encode(mf)
}
req, err := http.NewRequest(method, pushURL, buf)
req, err := http.NewRequest(method, p.fullURL(), buf)
if err != nil {
return err
}
@ -288,7 +269,40 @@ func (p *Pusher) push(method string) error {
defer resp.Body.Close()
if resp.StatusCode != 202 {
body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only.
return fmt.Errorf("unexpected status code %d while pushing to %s: %s", resp.StatusCode, pushURL, body)
return fmt.Errorf("unexpected status code %d while pushing to %s: %s", resp.StatusCode, p.fullURL(), body)
}
return nil
}
// fullURL assembles the URL used to push/delete metrics and returns it as a
// string. The job name and any grouping label values containing a '/' will
// trigger a base64 encoding of the affected component and proper suffixing of
// the preceding component. If the component does not contain a '/' but other
// special character, the usual url.QueryEscape is used for compatibility with
// older versions of the Pushgateway and for better readability.
func (p *Pusher) fullURL() string {
urlComponents := []string{}
if encodedJob, base64 := encodeComponent(p.job); base64 {
urlComponents = append(urlComponents, "job"+base64Suffix, encodedJob)
} else {
urlComponents = append(urlComponents, "job", encodedJob)
}
for ln, lv := range p.grouping {
if encodedLV, base64 := encodeComponent(lv); base64 {
urlComponents = append(urlComponents, ln+base64Suffix, encodedLV)
} else {
urlComponents = append(urlComponents, ln, encodedLV)
}
}
return fmt.Sprintf("%s/metrics/%s", p.url, strings.Join(urlComponents, "/"))
}
// encodeComponent encodes the provided string with base64.RawURLEncoding in
// case it contains '/'. If not, it uses url.QueryEscape instead. It returns
// true in the former case.
func encodeComponent(s string) (string, bool) {
if strings.Contains(s, "/") {
return base64.RawURLEncoding.EncodeToString([]byte(s)), true
}
return url.QueryEscape(s), false
}

View File

@ -120,6 +120,57 @@ func TestPush(t *testing.T) {
t.Error("unexpected path:", lastPath)
}
// Pushes that require base64 encoding.
if err := New(pgwOK.URL, "test/job").
Collector(metric1).
Collector(metric2).
Push(); err != nil {
t.Fatal(err)
}
if lastMethod != http.MethodPut {
t.Errorf("got method %q for Push, want %q", lastMethod, http.MethodPut)
}
if !bytes.Equal(lastBody, wantBody) {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job@base64/dGVzdC9qb2I" {
t.Error("unexpected path:", lastPath)
}
if err := New(pgwOK.URL, "testjob").
Grouping("foobar", "bu/ms").
Collector(metric1).
Collector(metric2).
Push(); err != nil {
t.Fatal(err)
}
if lastMethod != http.MethodPut {
t.Errorf("got method %q for Push, want %q", lastMethod, http.MethodPut)
}
if !bytes.Equal(lastBody, wantBody) {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job/testjob/foobar@base64/YnUvbXM" {
t.Error("unexpected path:", lastPath)
}
// Push that requires URL encoding.
if err := New(pgwOK.URL, "testjob").
Grouping("titan", "Προμηθεύς").
Collector(metric1).
Collector(metric2).
Push(); err != nil {
t.Fatal(err)
}
if lastMethod != http.MethodPut {
t.Errorf("got method %q for Push, want %q", lastMethod, http.MethodPut)
}
if !bytes.Equal(lastBody, wantBody) {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job/testjob/titan/%CE%A0%CF%81%CE%BF%CE%BC%CE%B7%CE%B8%CE%B5%CF%8D%CF%82" {
t.Error("unexpected path:", lastPath)
}
// Push some Collectors with a broken PGW.
if err := New(pgwErr.URL, "testjob").
Collector(metric1).
@ -140,19 +191,6 @@ func TestPush(t *testing.T) {
Push(); err == nil {
t.Error("push with grouping contained in metrics succeeded")
}
if err := New(pgwOK.URL, "test/job").
Collector(metric1).
Collector(metric2).
Push(); err == nil {
t.Error("push with invalid job value succeeded")
}
if err := New(pgwOK.URL, "testjob").
Grouping("foobar", "bu/ms").
Collector(metric1).
Collector(metric2).
Push(); err == nil {
t.Error("push with invalid grouping succeeded")
}
if err := New(pgwOK.URL, "testjob").
Grouping("foo-bar", "bums").
Collector(metric1).