Add completely new push syntax
This allows adding more options in elegant ways, showcased here by HTTP basic auth and by injecting a custom http.Client. Fixes #341 and #372.
This commit is contained in:
parent
a40133b69f
commit
3c2baee2d1
|
@ -0,0 +1,175 @@
|
||||||
|
// Copyright 2018 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
// Copyright (c) 2013, The Prometheus Authors
|
||||||
|
// All rights reserved.
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be found
|
||||||
|
// in the LICENSE file.
|
||||||
|
|
||||||
|
package push
|
||||||
|
|
||||||
|
// This file contains only deprecated code. Remove after v0.9 is released.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/expfmt"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FromGatherer triggers a metric collection by the provided Gatherer (which is
|
||||||
|
// usually implemented by a prometheus.Registry) and pushes all gathered metrics
|
||||||
|
// to the Pushgateway specified by url, using the provided job name and the
|
||||||
|
// (optional) further grouping labels (the grouping map may be nil). See the
|
||||||
|
// Pushgateway documentation for detailed implications of the job and other
|
||||||
|
// grouping labels. Neither the job name nor any grouping label value may
|
||||||
|
// contain a "/". The metrics pushed must not contain a job label of their own
|
||||||
|
// nor any of the grouping labels.
|
||||||
|
//
|
||||||
|
// You can use just host:port or ip:port as url, in which case 'http://' is
|
||||||
|
// added automatically. You can also include the schema in the URL. However, do
|
||||||
|
// not include the '/metrics/jobs/...' part.
|
||||||
|
//
|
||||||
|
// Note that all previously pushed metrics with the same job and other grouping
|
||||||
|
// labels will be replaced with the metrics pushed by this call. (It uses HTTP
|
||||||
|
// method 'PUT' to push to the Pushgateway.)
|
||||||
|
//
|
||||||
|
// Deprecated: Please use a Pusher created with New instead.
|
||||||
|
func FromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
|
||||||
|
return push(job, grouping, url, g, "PUT")
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddFromGatherer works like FromGatherer, but only previously pushed metrics
|
||||||
|
// with the same name (and the same job and other grouping labels) will be
|
||||||
|
// replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
|
||||||
|
//
|
||||||
|
// Deprecated: Please use a Pusher created with New instead.
|
||||||
|
func AddFromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
|
||||||
|
return push(job, grouping, url, g, "POST")
|
||||||
|
}
|
||||||
|
|
||||||
|
func push(job string, grouping map[string]string, pushURL string, g prometheus.Gatherer, method string) error {
|
||||||
|
if !strings.Contains(pushURL, "://") {
|
||||||
|
pushURL = "http://" + pushURL
|
||||||
|
}
|
||||||
|
if strings.HasSuffix(pushURL, "/") {
|
||||||
|
pushURL = pushURL[:len(pushURL)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(job, "/") {
|
||||||
|
return fmt.Errorf("job contains '/': %s", job)
|
||||||
|
}
|
||||||
|
urlComponents := []string{url.QueryEscape(job)}
|
||||||
|
for ln, lv := range grouping {
|
||||||
|
if !model.LabelName(ln).IsValid() {
|
||||||
|
return fmt.Errorf("grouping label has invalid name: %s", ln)
|
||||||
|
}
|
||||||
|
if strings.Contains(lv, "/") {
|
||||||
|
return fmt.Errorf("value of grouping label %s contains '/': %s", ln, lv)
|
||||||
|
}
|
||||||
|
urlComponents = append(urlComponents, ln, lv)
|
||||||
|
}
|
||||||
|
pushURL = fmt.Sprintf("%s/metrics/job/%s", pushURL, strings.Join(urlComponents, "/"))
|
||||||
|
|
||||||
|
mfs, err := g.Gather()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
enc := expfmt.NewEncoder(buf, expfmt.FmtProtoDelim)
|
||||||
|
// Check for pre-existing grouping labels:
|
||||||
|
for _, mf := range mfs {
|
||||||
|
for _, m := range mf.GetMetric() {
|
||||||
|
for _, l := range m.GetLabel() {
|
||||||
|
if l.GetName() == "job" {
|
||||||
|
return fmt.Errorf("pushed metric %s (%s) already contains a job label", mf.GetName(), m)
|
||||||
|
}
|
||||||
|
if _, ok := grouping[l.GetName()]; ok {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"pushed metric %s (%s) already contains grouping label %s",
|
||||||
|
mf.GetName(), m, l.GetName(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enc.Encode(mf)
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest(method, pushURL, buf)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set(contentTypeHeader, string(expfmt.FmtProtoDelim))
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != 202 {
|
||||||
|
body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only.
|
||||||
|
return fmt.Errorf("unexpected status code %d while pushing to %s: %s", resp.StatusCode, pushURL, body)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collectors works like FromGatherer, but it does not use a Gatherer. Instead,
|
||||||
|
// it collects from the provided collectors directly. It is a convenient way to
|
||||||
|
// push only a few metrics.
|
||||||
|
//
|
||||||
|
// Deprecated: Please use a Pusher created with New instead.
|
||||||
|
func Collectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {
|
||||||
|
return pushCollectors(job, grouping, url, "PUT", collectors...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddCollectors works like AddFromGatherer, but it does not use a Gatherer.
|
||||||
|
// Instead, it collects from the provided collectors directly. It is a
|
||||||
|
// convenient way to push only a few metrics.
|
||||||
|
//
|
||||||
|
// Deprecated: Please use a Pusher created with New instead.
|
||||||
|
func AddCollectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {
|
||||||
|
return pushCollectors(job, grouping, url, "POST", collectors...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func pushCollectors(job string, grouping map[string]string, url, method string, collectors ...prometheus.Collector) error {
|
||||||
|
r := prometheus.NewRegistry()
|
||||||
|
for _, collector := range collectors {
|
||||||
|
if err := r.Register(collector); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return push(job, grouping, url, r, method)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HostnameGroupingKey returns a label map with the only entry
|
||||||
|
// {instance="<hostname>"}. This can be conveniently used as the grouping
|
||||||
|
// parameter if metrics should be pushed with the hostname as label. The
|
||||||
|
// returned map is created upon each call so that the caller is free to add more
|
||||||
|
// labels to the map.
|
||||||
|
//
|
||||||
|
// Deprecated: It is recommended to use a Pusher created with New, which has a
|
||||||
|
// method HostnameGrouping.
|
||||||
|
func HostnameGroupingKey() map[string]string {
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
return map[string]string{"instance": "unknown"}
|
||||||
|
}
|
||||||
|
return map[string]string{"instance": hostname}
|
||||||
|
}
|
|
@ -53,10 +53,14 @@ func performBackup() (int, error) {
|
||||||
return 42, nil
|
return 42, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExampleAddFromGatherer() {
|
func ExamplePusher_Add() {
|
||||||
|
// We use a registry here to benefit from the consistency checks that
|
||||||
|
// happen during registration.
|
||||||
registry := prometheus.NewRegistry()
|
registry := prometheus.NewRegistry()
|
||||||
registry.MustRegister(completionTime, duration, records)
|
registry.MustRegister(completionTime, duration, records)
|
||||||
// Note that successTime is not registered at this time.
|
// Note that successTime is not registered.
|
||||||
|
|
||||||
|
pusher := push.New("http://pushgateway:9091", "db_backup").Gatherer(registry)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
n, err := performBackup()
|
n, err := performBackup()
|
||||||
|
@ -67,18 +71,16 @@ func ExampleAddFromGatherer() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("DB backup failed:", err)
|
fmt.Println("DB backup failed:", err)
|
||||||
} else {
|
} else {
|
||||||
// Only now register successTime.
|
// Add successTime to pusher only in case of success.
|
||||||
registry.MustRegister(successTime)
|
// We could as well register it with the registry.
|
||||||
|
// This example, however, demonstrates that you can
|
||||||
|
// mix Gatherers and Collectors when handling a Pusher.
|
||||||
|
pusher.Collector(successTime)
|
||||||
successTime.SetToCurrentTime()
|
successTime.SetToCurrentTime()
|
||||||
}
|
}
|
||||||
// AddFromGatherer is used here rather than FromGatherer to not delete a
|
// Add is used here rather than Push to not delete a previously pushed
|
||||||
// previously pushed success timestamp in case of a failure of this
|
// success timestamp in case of a failure of this backup.
|
||||||
// backup.
|
if err := pusher.Add(); err != nil {
|
||||||
if err := push.AddFromGatherer(
|
|
||||||
"db_backup", nil,
|
|
||||||
"http://pushgateway:9091",
|
|
||||||
registry,
|
|
||||||
); err != nil {
|
|
||||||
fmt.Println("Could not push to Pushgateway:", err)
|
fmt.Println("Could not push to Pushgateway:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,17 +20,16 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus/push"
|
"github.com/prometheus/client_golang/prometheus/push"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ExampleCollectors() {
|
func ExamplePusher_Push() {
|
||||||
completionTime := prometheus.NewGauge(prometheus.GaugeOpts{
|
completionTime := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Name: "db_backup_last_completion_timestamp_seconds",
|
Name: "db_backup_last_completion_timestamp_seconds",
|
||||||
Help: "The timestamp of the last successful completion of a DB backup.",
|
Help: "The timestamp of the last successful completion of a DB backup.",
|
||||||
})
|
})
|
||||||
completionTime.SetToCurrentTime()
|
completionTime.SetToCurrentTime()
|
||||||
if err := push.Collectors(
|
if err := push.New("http://pushgateway:9091", "db_backup").
|
||||||
"db_backup", push.HostnameGroupingKey(),
|
Collector(completionTime).
|
||||||
"http://pushgateway:9091",
|
HostnameGrouping().
|
||||||
completionTime,
|
Push(); err != nil {
|
||||||
); err != nil {
|
|
||||||
fmt.Println("Could not push completion time to Pushgateway:", err)
|
fmt.Println("Could not push completion time to Pushgateway:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,20 +11,28 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
// Copyright (c) 2013, The Prometheus Authors
|
// Package push provides functions to push metrics to a Pushgateway. It uses a
|
||||||
// All rights reserved.
|
// builder approach. Create a Pusher with New and then add the various options
|
||||||
|
// by using its methods, finally calling Add or Push, like this:
|
||||||
//
|
//
|
||||||
// Use of this source code is governed by a BSD-style license that can be found
|
// // Easy case:
|
||||||
// in the LICENSE file.
|
// push.New("http://example.org/metrics", "my_job").Gatherer(myRegistry).Push()
|
||||||
|
|
||||||
// Package push provides functions to push metrics to a Pushgateway. The metrics
|
|
||||||
// to push are either collected from a provided registry, or from explicitly
|
|
||||||
// listed collectors.
|
|
||||||
//
|
//
|
||||||
// See the documentation of the Pushgateway to understand the meaning of the
|
// // Complex case:
|
||||||
// grouping parameters and the differences between push.Registry and
|
// push.New("http://example.org/metrics", "my_job").
|
||||||
// push.Collectors on the one hand and push.AddRegistry and push.AddCollectors
|
// Collector(myCollector1).
|
||||||
// on the other hand: https://github.com/prometheus/pushgateway
|
// Collector(myCollector2).
|
||||||
|
// HostnameGrouping().
|
||||||
|
// Grouping("zone", "xy").
|
||||||
|
// Client(&myHTTPClient).
|
||||||
|
// BasicAuth("top", "secret").
|
||||||
|
// Add()
|
||||||
|
//
|
||||||
|
// See the examples section for more detailed examples.
|
||||||
|
//
|
||||||
|
// See the documentation of the Pushgateway to understand the meaning of
|
||||||
|
// the grouping key and the differences between Push and Add:
|
||||||
|
// https://github.com/prometheus/pushgateway
|
||||||
package push
|
package push
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -44,57 +52,164 @@ import (
|
||||||
|
|
||||||
const contentTypeHeader = "Content-Type"
|
const contentTypeHeader = "Content-Type"
|
||||||
|
|
||||||
// FromGatherer triggers a metric collection by the provided Gatherer (which is
|
// Pusher manages a push to the Pushgateway. Use New to create one, configure it
|
||||||
// usually implemented by a prometheus.Registry) and pushes all gathered metrics
|
// with its methods, and finally use the Add or Push method to push.
|
||||||
// to the Pushgateway specified by url, using the provided job name and the
|
type Pusher struct {
|
||||||
// (optional) further grouping labels (the grouping map may be nil). See the
|
error error
|
||||||
// Pushgateway documentation for detailed implications of the job and other
|
|
||||||
// grouping labels. Neither the job name nor any grouping label value may
|
url, job string
|
||||||
// contain a "/". The metrics pushed must not contain a job label of their own
|
grouping map[string]string
|
||||||
// nor any of the grouping labels.
|
|
||||||
//
|
gatherers prometheus.Gatherers
|
||||||
// You can use just host:port or ip:port as url, in which case 'http://' is
|
registerer prometheus.Registerer
|
||||||
// added automatically. You can also include the schema in the URL. However, do
|
|
||||||
// not include the '/metrics/jobs/...' part.
|
client *http.Client
|
||||||
//
|
useBasicAuth bool
|
||||||
// Note that all previously pushed metrics with the same job and other grouping
|
username, password string
|
||||||
// labels will be replaced with the metrics pushed by this call. (It uses HTTP
|
|
||||||
// method 'PUT' to push to the Pushgateway.)
|
|
||||||
func FromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
|
|
||||||
return push(job, grouping, url, g, "PUT")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddFromGatherer works like FromGatherer, but only previously pushed metrics
|
// New creates a new Pusher to push to the provided URL withe the provided job
|
||||||
// with the same name (and the same job and other grouping labels) will be
|
// name. You can use just host:port or ip:port as url, in which case “http://”
|
||||||
// replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
|
// is added automatically. Alternatively, include the schema in the
|
||||||
func AddFromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
|
// URL. However, do not include the “/metrics/jobs/…” part.
|
||||||
return push(job, grouping, url, g, "POST")
|
//
|
||||||
}
|
// Note that until https://github.com/prometheus/pushgateway/issues/97 is
|
||||||
|
// resolved, a “/” character in the job name is prohibited.
|
||||||
func push(job string, grouping map[string]string, pushURL string, g prometheus.Gatherer, method string) error {
|
func New(url, job string) *Pusher {
|
||||||
if !strings.Contains(pushURL, "://") {
|
var (
|
||||||
pushURL = "http://" + pushURL
|
reg = prometheus.NewRegistry()
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if !strings.Contains(url, "://") {
|
||||||
|
url = "http://" + url
|
||||||
}
|
}
|
||||||
if strings.HasSuffix(pushURL, "/") {
|
if strings.HasSuffix(url, "/") {
|
||||||
pushURL = pushURL[:len(pushURL)-1]
|
url = url[:len(url)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.Contains(job, "/") {
|
if strings.Contains(job, "/") {
|
||||||
return fmt.Errorf("job contains '/': %s", job)
|
err = fmt.Errorf("job contains '/': %s", job)
|
||||||
}
|
}
|
||||||
urlComponents := []string{url.QueryEscape(job)}
|
|
||||||
for ln, lv := range grouping {
|
return &Pusher{
|
||||||
if !model.LabelName(ln).IsValid() {
|
error: err,
|
||||||
return fmt.Errorf("grouping label has invalid name: %s", ln)
|
url: url,
|
||||||
|
job: job,
|
||||||
|
grouping: map[string]string{},
|
||||||
|
gatherers: prometheus.Gatherers{reg},
|
||||||
|
registerer: reg,
|
||||||
|
client: &http.Client{},
|
||||||
}
|
}
|
||||||
if strings.Contains(lv, "/") {
|
}
|
||||||
return fmt.Errorf("value of grouping label %s contains '/': %s", ln, lv)
|
|
||||||
|
// Push collects/gathers all metrics from all Collectors and Gatherers added to
|
||||||
|
// this Pusher. Then, it pushes them to the Pushgateway configured while
|
||||||
|
// creating this Pusher, using the configured job name and any added grouping
|
||||||
|
// labels as grouping key. All previously pushed metrics with the same job and
|
||||||
|
// other grouping labels will be replaced with the metrics pushed by this
|
||||||
|
// call. (It uses HTTP method “PUT” to push to the Pushgateway.)
|
||||||
|
//
|
||||||
|
// Push returns the first error encountered by any method call (including this
|
||||||
|
// one) in the lifetime of the Pusher.
|
||||||
|
func (p *Pusher) Push() error {
|
||||||
|
return p.push("PUT")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add works like push, but only previously pushed metrics with the same name
|
||||||
|
// (and the same job and other grouping labels) will be replaced. (It uses HTTP
|
||||||
|
// method “POST” to push to the Pushgateway.)
|
||||||
|
func (p *Pusher) Add() error {
|
||||||
|
return p.push("POST")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gatherer adds a Gatherer to the Pusher, from which metrics will be gathered
|
||||||
|
// to push them to the Pushgateway. The gathered metrics must not contain a job
|
||||||
|
// label of their own.
|
||||||
|
//
|
||||||
|
// For convenience, this method returns a pointer to the Pusher itself.
|
||||||
|
func (p *Pusher) Gatherer(g prometheus.Gatherer) *Pusher {
|
||||||
|
p.gatherers = append(p.gatherers, g)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collector adds a Collector to the Pusher, from which metrics will be
|
||||||
|
// collected to push them to the Pushgateway. The collected metrics must not
|
||||||
|
// contain a job label of their own.
|
||||||
|
//
|
||||||
|
// For convenience, this method returns a pointer to the Pusher itself.
|
||||||
|
func (p *Pusher) Collector(c prometheus.Collector) *Pusher {
|
||||||
|
if p.error == nil {
|
||||||
|
p.error = p.registerer.Register(c)
|
||||||
}
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Grouping adds a label pair to the grouping key of the Pusher, replacing any
|
||||||
|
// previously added label pair with the same label name. Note that setting any
|
||||||
|
// labels in the grouping key that are already contained in the metrics to push
|
||||||
|
// will lead to an error.
|
||||||
|
//
|
||||||
|
// For convenience, this method returns a pointer to the Pusher itself.
|
||||||
|
//
|
||||||
|
// Note that until https://github.com/prometheus/pushgateway/issues/97 is
|
||||||
|
// resolved, this method does not allow a “/” character in the label value.
|
||||||
|
func (p *Pusher) Grouping(name, value string) *Pusher {
|
||||||
|
if p.error == nil {
|
||||||
|
if !model.LabelName(name).IsValid() {
|
||||||
|
p.error = fmt.Errorf("grouping label has invalid name: %s", name)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
if strings.Contains(value, "/") {
|
||||||
|
p.error = fmt.Errorf("value of grouping label %s contains '/': %s", name, value)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
p.grouping[name] = value
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// HostnameGrouping adds a label pair with “instance” as the name and the
|
||||||
|
// current hostname as the value to the grouping key of the Pusher. If the
|
||||||
|
// hostname cannot be determined, “unknown” is used as the label value. For
|
||||||
|
// further implications of setting a grouping key, see the Grouping method.
|
||||||
|
//
|
||||||
|
// For convenience, this method returns a pointer to the Pusher itself.
|
||||||
|
func (p *Pusher) HostnameGrouping() *Pusher {
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
hostname = "unknown"
|
||||||
|
}
|
||||||
|
p.grouping["instance"] = hostname
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client sets a custom HTTP client for the Pusher. For convenience, this method
|
||||||
|
// returns a pointer to the Pusher itself.
|
||||||
|
func (p *Pusher) Client(c *http.Client) *Pusher {
|
||||||
|
p.client = c
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// BasicAuth configures the Pusher to use HTTP Basic Authentication with the
|
||||||
|
// provided username and password. For convenience, this method returns a
|
||||||
|
// pointer to the Pusher itself.
|
||||||
|
func (p *Pusher) BasicAuth(username, password string) *Pusher {
|
||||||
|
p.useBasicAuth = true
|
||||||
|
p.username = username
|
||||||
|
p.password = password
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pusher) push(method string) error {
|
||||||
|
if p.error != nil {
|
||||||
|
return p.error
|
||||||
|
}
|
||||||
|
urlComponents := []string{url.QueryEscape(p.job)}
|
||||||
|
for ln, lv := range p.grouping {
|
||||||
urlComponents = append(urlComponents, ln, lv)
|
urlComponents = append(urlComponents, ln, lv)
|
||||||
}
|
}
|
||||||
pushURL = fmt.Sprintf("%s/metrics/job/%s", pushURL, strings.Join(urlComponents, "/"))
|
pushURL := fmt.Sprintf("%s/metrics/job/%s", p.url, strings.Join(urlComponents, "/"))
|
||||||
|
|
||||||
mfs, err := g.Gather()
|
mfs, err := p.gatherers.Gather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -107,7 +222,7 @@ func push(job string, grouping map[string]string, pushURL string, g prometheus.G
|
||||||
if l.GetName() == "job" {
|
if l.GetName() == "job" {
|
||||||
return fmt.Errorf("pushed metric %s (%s) already contains a job label", mf.GetName(), m)
|
return fmt.Errorf("pushed metric %s (%s) already contains a job label", mf.GetName(), m)
|
||||||
}
|
}
|
||||||
if _, ok := grouping[l.GetName()]; ok {
|
if _, ok := p.grouping[l.GetName()]; ok {
|
||||||
return fmt.Errorf(
|
return fmt.Errorf(
|
||||||
"pushed metric %s (%s) already contains grouping label %s",
|
"pushed metric %s (%s) already contains grouping label %s",
|
||||||
mf.GetName(), m, l.GetName(),
|
mf.GetName(), m, l.GetName(),
|
||||||
|
@ -121,8 +236,11 @@ func push(job string, grouping map[string]string, pushURL string, g prometheus.G
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if p.useBasicAuth {
|
||||||
|
req.SetBasicAuth(p.username, p.password)
|
||||||
|
}
|
||||||
req.Header.Set(contentTypeHeader, string(expfmt.FmtProtoDelim))
|
req.Header.Set(contentTypeHeader, string(expfmt.FmtProtoDelim))
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := p.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -133,40 +251,3 @@ func push(job string, grouping map[string]string, pushURL string, g prometheus.G
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collectors works like FromGatherer, but it does not use a Gatherer. Instead,
|
|
||||||
// it collects from the provided collectors directly. It is a convenient way to
|
|
||||||
// push only a few metrics.
|
|
||||||
func Collectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {
|
|
||||||
return pushCollectors(job, grouping, url, "PUT", collectors...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddCollectors works like AddFromGatherer, but it does not use a Gatherer.
|
|
||||||
// Instead, it collects from the provided collectors directly. It is a
|
|
||||||
// convenient way to push only a few metrics.
|
|
||||||
func AddCollectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {
|
|
||||||
return pushCollectors(job, grouping, url, "POST", collectors...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func pushCollectors(job string, grouping map[string]string, url, method string, collectors ...prometheus.Collector) error {
|
|
||||||
r := prometheus.NewRegistry()
|
|
||||||
for _, collector := range collectors {
|
|
||||||
if err := r.Register(collector); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return push(job, grouping, url, r, method)
|
|
||||||
}
|
|
||||||
|
|
||||||
// HostnameGroupingKey returns a label map with the only entry
|
|
||||||
// {instance="<hostname>"}. This can be conveniently used as the grouping
|
|
||||||
// parameter if metrics should be pushed with the hostname as label. The
|
|
||||||
// returned map is created upon each call so that the caller is free to add more
|
|
||||||
// labels to the map.
|
|
||||||
func HostnameGroupingKey() map[string]string {
|
|
||||||
hostname, err := os.Hostname()
|
|
||||||
if err != nil {
|
|
||||||
return map[string]string{"instance": "unknown"}
|
|
||||||
}
|
|
||||||
return map[string]string{"instance": hostname}
|
|
||||||
}
|
|
||||||
|
|
|
@ -98,12 +98,16 @@ func TestPush(t *testing.T) {
|
||||||
}
|
}
|
||||||
wantBody := buf.Bytes()
|
wantBody := buf.Bytes()
|
||||||
|
|
||||||
// PushCollectors, all good.
|
// Push some Collectors, all good.
|
||||||
if err := Collectors("testjob", HostnameGroupingKey(), pgwOK.URL, metric1, metric2); err != nil {
|
if err := New(pgwOK.URL, "testjob").
|
||||||
|
HostnameGrouping().
|
||||||
|
Collector(metric1).
|
||||||
|
Collector(metric2).
|
||||||
|
Push(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if lastMethod != "PUT" {
|
if lastMethod != "PUT" {
|
||||||
t.Error("want method PUT for PushCollectors, got", lastMethod)
|
t.Error("want method PUT for Push, got", lastMethod)
|
||||||
}
|
}
|
||||||
if bytes.Compare(lastBody, wantBody) != 0 {
|
if bytes.Compare(lastBody, wantBody) != 0 {
|
||||||
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
||||||
|
@ -112,12 +116,15 @@ func TestPush(t *testing.T) {
|
||||||
t.Error("unexpected path:", lastPath)
|
t.Error("unexpected path:", lastPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushAddCollectors, with nil grouping, all good.
|
// Add some Collectors, with nil grouping, all good.
|
||||||
if err := AddCollectors("testjob", nil, pgwOK.URL, metric1, metric2); err != nil {
|
if err := New(pgwOK.URL, "testjob").
|
||||||
|
Collector(metric1).
|
||||||
|
Collector(metric2).
|
||||||
|
Add(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if lastMethod != "POST" {
|
if lastMethod != "POST" {
|
||||||
t.Error("want method POST for PushAddCollectors, got", lastMethod)
|
t.Error("want method POST for Add, got", lastMethod)
|
||||||
}
|
}
|
||||||
if bytes.Compare(lastBody, wantBody) != 0 {
|
if bytes.Compare(lastBody, wantBody) != 0 {
|
||||||
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
||||||
|
@ -126,8 +133,11 @@ func TestPush(t *testing.T) {
|
||||||
t.Error("unexpected path:", lastPath)
|
t.Error("unexpected path:", lastPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushCollectors with a broken PGW.
|
// Push some Collectors with a broken PGW.
|
||||||
if err := Collectors("testjob", nil, pgwErr.URL, metric1, metric2); err == nil {
|
if err := New(pgwErr.URL, "testjob").
|
||||||
|
Collector(metric1).
|
||||||
|
Collector(metric2).
|
||||||
|
Push(); err == nil {
|
||||||
t.Error("push to broken Pushgateway succeeded")
|
t.Error("push to broken Pushgateway succeeded")
|
||||||
} else {
|
} else {
|
||||||
if got, want := err.Error(), "unexpected status code 500 while pushing to "+pgwErr.URL+"/metrics/job/testjob: fake error\n"; got != want {
|
if got, want := err.Error(), "unexpected status code 500 while pushing to "+pgwErr.URL+"/metrics/job/testjob: fake error\n"; got != want {
|
||||||
|
@ -135,22 +145,40 @@ func TestPush(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushCollectors with invalid grouping or job.
|
// Push some Collectors with invalid grouping or job.
|
||||||
if err := Collectors("testjob", map[string]string{"foo": "bums"}, pgwErr.URL, metric1, metric2); err == nil {
|
if err := New(pgwOK.URL, "testjob").
|
||||||
|
Grouping("foo", "bums").
|
||||||
|
Collector(metric1).
|
||||||
|
Collector(metric2).
|
||||||
|
Push(); err == nil {
|
||||||
t.Error("push with grouping contained in metrics succeeded")
|
t.Error("push with grouping contained in metrics succeeded")
|
||||||
}
|
}
|
||||||
if err := Collectors("test/job", nil, pgwErr.URL, metric1, metric2); err == nil {
|
if err := New(pgwOK.URL, "test/job").
|
||||||
|
Collector(metric1).
|
||||||
|
Collector(metric2).
|
||||||
|
Push(); err == nil {
|
||||||
t.Error("push with invalid job value succeeded")
|
t.Error("push with invalid job value succeeded")
|
||||||
}
|
}
|
||||||
if err := Collectors("testjob", map[string]string{"foo/bar": "bums"}, pgwErr.URL, metric1, metric2); err == nil {
|
if err := New(pgwOK.URL, "testjob").
|
||||||
|
Grouping("foobar", "bu/ms").
|
||||||
|
Collector(metric1).
|
||||||
|
Collector(metric2).
|
||||||
|
Push(); err == nil {
|
||||||
t.Error("push with invalid grouping succeeded")
|
t.Error("push with invalid grouping succeeded")
|
||||||
}
|
}
|
||||||
if err := Collectors("testjob", map[string]string{"foo-bar": "bums"}, pgwErr.URL, metric1, metric2); err == nil {
|
if err := New(pgwOK.URL, "testjob").
|
||||||
|
Grouping("foo-bar", "bums").
|
||||||
|
Collector(metric1).
|
||||||
|
Collector(metric2).
|
||||||
|
Push(); err == nil {
|
||||||
t.Error("push with invalid grouping succeeded")
|
t.Error("push with invalid grouping succeeded")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Push registry, all good.
|
// Push registry, all good.
|
||||||
if err := FromGatherer("testjob", HostnameGroupingKey(), pgwOK.URL, reg); err != nil {
|
if err := New(pgwOK.URL, "testjob").
|
||||||
|
HostnameGrouping().
|
||||||
|
Gatherer(reg).
|
||||||
|
Push(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if lastMethod != "PUT" {
|
if lastMethod != "PUT" {
|
||||||
|
@ -160,12 +188,16 @@ func TestPush(t *testing.T) {
|
||||||
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushAdd registry, all good.
|
// Add registry, all good.
|
||||||
if err := AddFromGatherer("testjob", map[string]string{"a": "x", "b": "y"}, pgwOK.URL, reg); err != nil {
|
if err := New(pgwOK.URL, "testjob").
|
||||||
|
Grouping("a", "x").
|
||||||
|
Grouping("b", "y").
|
||||||
|
Gatherer(reg).
|
||||||
|
Add(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if lastMethod != "POST" {
|
if lastMethod != "POST" {
|
||||||
t.Error("want method POSTT for PushAdd, got", lastMethod)
|
t.Error("want method POST for Add, got", lastMethod)
|
||||||
}
|
}
|
||||||
if bytes.Compare(lastBody, wantBody) != 0 {
|
if bytes.Compare(lastBody, wantBody) != 0 {
|
||||||
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
t.Errorf("got body %v, want %v", lastBody, wantBody)
|
||||||
|
|
Loading…
Reference in New Issue