// Copyright 2014 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Copyright (c) 2013, The Prometheus Authors // All rights reserved. // // Use of this source code is governed by a BSD-style license that can be found // in the LICENSE file. package prometheus import ( "bytes" "compress/gzip" "errors" "fmt" "io" "net/http" "net/url" "sort" "strings" "sync" "github.com/golang/protobuf/proto" "github.com/prometheus/common/expfmt" dto "github.com/prometheus/client_model/go" "github.com/prometheus/client_golang/prometheus/promhttp" ) // Handler returns an instrumented HTTP handler for the default Prometheus // registry. It is already instrumented with InstrumentHandler (using // "prometheus" as handler name). Usually the handler is used to handle the // "/metrics" endpoint. func Handler() http.Handler { return promhttp.InstrumentHandler( "prometheus", promhttp.Handler(registry.Default), ) } // UninstrumentedHandler works in the same way as Handler, but the returned HTTP // handler is not instrumented. This is useful if no instrumentation is desired // (for whatever reason) or if the instrumentation has to happen with a // different handler name (or with a different instrumentation approach // altogether). See the InstrumentHandler example. func UninstrumentedHandler() http.Handler { return promhttp.Handler(registry.Default) } // Register registers a new Collector to be included in metrics collection with // the default registry. It returns an error if the descriptors provided by the // Collector are invalid or if they - in combination with descriptors of already // registered Collectors - do not fulfill the consistency and uniqueness // criteria described in the Desc documentation. // // Do not register the same Collector multiple times concurrently. (Registering // the same Collector twice would result in an error anyway, but on top of that, // it is not safe to do so concurrently.) func Register(c Collector) error { return registry.Default.Register(c) } // MustRegister works like Register but panics where Register would have // returned an error. func MustRegister(c Collector) { registry.MustRegister(registry.Default, c) } // RegisterOrGet works like Register but does not return an error if a Collector // is registered that equals a previously registered Collector. (Two Collectors // are considered equal if their Describe method yields the same set of // descriptors.) Instead, the previously registered Collector is returned (which // is helpful if the new and previously registered Collectors are equal but not // identical, i.e. not pointers to the same object). // // As for Register, it is still not safe to call RegisterOrGet with the same // Collector multiple times concurrently. func RegisterOrGet(c Collector) (Collector, error) { return registry.RegisterOrGet(registry.Default, c) } // MustRegisterOrGet works like Register but panics where RegisterOrGet would // have returned an error. func MustRegisterOrGet(c Collector) Collector { return registry.MustRegisterOrGet(registry.Default, c) } // Unregister unregisters the Collector that equals the Collector passed in as // an argument. (Two Collectors are considered equal if their Describe method // yields the same set of descriptors.) The function returns whether a Collector // was unregistered. func Unregister(c Collector) bool { return registry.Default.Unregister(c) } // TODO: Move out from here. // encoder is a function that writes a dto.MetricFamily to an io.Writer in a // certain encoding. It returns the number of bytes written and any error // encountered. Note that pbutil.WriteDelimited and pbutil.MetricFamilyToText // are encoders. type encoder func(io.Writer, *dto.MetricFamily) (int, error) type registry struct { mtx sync.RWMutex collectorsByID map[uint64]Collector // ID is a hash of the descIDs. descIDs map[uint64]struct{} dimHashesByName map[string]uint64 bufPool chan *bytes.Buffer metricFamilyPool chan *dto.MetricFamily metricPool chan *dto.Metric metricFamilyInjectionHook func() []*dto.MetricFamily panicOnCollectError, collectChecksEnabled bool } func (r *registry) Register(c Collector) (Collector, error) { descChan := make(chan *Desc, capDescChan) go func() { c.Describe(descChan) close(descChan) }() newDescIDs := map[uint64]struct{}{} newDimHashesByName := map[string]uint64{} var collectorID uint64 // Just a sum of all desc IDs. var duplicateDescErr error r.mtx.Lock() defer r.mtx.Unlock() // Coduct various tests... for desc := range descChan { // Is the descriptor valid at all? if desc.err != nil { return c, fmt.Errorf("descriptor %s is invalid: %s", desc, desc.err) } // Is the descID unique? // (In other words: Is the fqName + constLabel combination unique?) if _, exists := r.descIDs[desc.id]; exists { duplicateDescErr = fmt.Errorf("descriptor %s already exists with the same fully-qualified name and const label values", desc) } // If it is not a duplicate desc in this collector, add it to // the collectorID. (We allow duplicate descs within the same // collector, but their existence must be a no-op.) if _, exists := newDescIDs[desc.id]; !exists { newDescIDs[desc.id] = struct{}{} collectorID += desc.id } // Are all the label names and the help string consistent with // previous descriptors of the same name? // First check existing descriptors... if dimHash, exists := r.dimHashesByName[desc.fqName]; exists { if dimHash != desc.dimHash { return nil, fmt.Errorf("a previously registered descriptor with the same fully-qualified name as %s has different label names or a different help string", desc) } } else { // ...then check the new descriptors already seen. if dimHash, exists := newDimHashesByName[desc.fqName]; exists { if dimHash != desc.dimHash { return nil, fmt.Errorf("descriptors reported by collector have inconsistent label names or help strings for the same fully-qualified name, offender is %s", desc) } } else { newDimHashesByName[desc.fqName] = desc.dimHash } } } // Did anything happen at all? if len(newDescIDs) == 0 { return nil, errors.New("collector has no descriptors") } if existing, exists := r.collectorsByID[collectorID]; exists { return existing, errAlreadyReg } // If the collectorID is new, but at least one of the descs existed // before, we are in trouble. if duplicateDescErr != nil { return nil, duplicateDescErr } // Only after all tests have passed, actually register. r.collectorsByID[collectorID] = c for hash := range newDescIDs { r.descIDs[hash] = struct{}{} } for name, dimHash := range newDimHashesByName { r.dimHashesByName[name] = dimHash } return c, nil } func (r *registry) RegisterOrGet(m Collector) (Collector, error) { existing, err := r.Register(m) if err != nil && err != errAlreadyReg { return nil, err } return existing, nil } func (r *registry) Unregister(c Collector) bool { descChan := make(chan *Desc, capDescChan) go func() { c.Describe(descChan) close(descChan) }() descIDs := map[uint64]struct{}{} var collectorID uint64 // Just a sum of the desc IDs. TODO: should be fnv on its own for desc := range descChan { if _, exists := descIDs[desc.id]; !exists { collectorID += desc.id descIDs[desc.id] = struct{}{} } } r.mtx.RLock() if _, exists := r.collectorsByID[collectorID]; !exists { r.mtx.RUnlock() return false } r.mtx.RUnlock() r.mtx.Lock() defer r.mtx.Unlock() delete(r.collectorsByID, collectorID) for id := range descIDs { delete(r.descIDs, id) } // dimHashesByName is left untouched as those must be consistent // throughout the lifetime of a program. return true } func (r *registry) Push(job, instance, pushURL, method string) error { if !strings.Contains(pushURL, "://") { pushURL = "http://" + pushURL } if strings.HasSuffix(pushURL, "/") { pushURL = pushURL[:len(pushURL)-1] } pushURL = fmt.Sprintf("%s/metrics/jobs/%s", pushURL, url.QueryEscape(job)) if instance != "" { pushURL += "/instances/" + url.QueryEscape(instance) } buf := r.getBuf() defer r.giveBuf(buf) if err := r.writePB(expfmt.NewEncoder(buf, expfmt.FmtProtoDelim)); err != nil { if r.panicOnCollectError { panic(err) } return err } req, err := http.NewRequest(method, pushURL, buf) if err != nil { return err } req.Header.Set(contentTypeHeader, DelimitedTelemetryContentType) resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != 202 { return fmt.Errorf("unexpected status code %d while pushing to %s", resp.StatusCode, pushURL) } return nil } func (r *registry) ServeHTTP(w http.ResponseWriter, req *http.Request) { contentType := expfmt.Negotiate(req.Header) buf := r.getBuf() defer r.giveBuf(buf) writer, encoding := decorateWriter(req, buf) if err := r.writePB(expfmt.NewEncoder(writer, contentType)); err != nil { if r.panicOnCollectError { panic(err) } http.Error(w, "An error has occurred:\n\n"+err.Error(), http.StatusInternalServerError) return } if closer, ok := writer.(io.Closer); ok { closer.Close() } header := w.Header() header.Set(contentTypeHeader, string(contentType)) header.Set(contentLengthHeader, fmt.Sprint(buf.Len())) if encoding != "" { header.Set(contentEncodingHeader, encoding) } w.Write(buf.Bytes()) } func (r *registry) writePB(encoder expfmt.Encoder) error { var metricHashes map[uint64]struct{} if r.collectChecksEnabled { metricHashes = make(map[uint64]struct{}) } metricChan := make(chan Metric, capMetricChan) wg := sync.WaitGroup{} r.mtx.RLock() metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) // Scatter. // (Collectors could be complex and slow, so we call them all at once.) wg.Add(len(r.collectorsByID)) go func() { wg.Wait() close(metricChan) }() for _, collector := range r.collectorsByID { go func(collector Collector) { defer wg.Done() collector.Collect(metricChan) }(collector) } r.mtx.RUnlock() // Drain metricChan in case of premature return. defer func() { for _ = range metricChan { } }() // Gather. for metric := range metricChan { // This could be done concurrently, too, but it required locking // of metricFamiliesByName (and of metricHashes if checks are // enabled). Most likely not worth it. desc := metric.Desc() metricFamily, ok := metricFamiliesByName[desc.fqName] if !ok { metricFamily = r.getMetricFamily() defer r.giveMetricFamily(metricFamily) metricFamily.Name = proto.String(desc.fqName) metricFamily.Help = proto.String(desc.help) metricFamiliesByName[desc.fqName] = metricFamily } dtoMetric := r.getMetric() defer r.giveMetric(dtoMetric) if err := metric.Write(dtoMetric); err != nil { // TODO: Consider different means of error reporting so // that a single erroneous metric could be skipped // instead of blowing up the whole collection. return fmt.Errorf("error collecting metric %v: %s", desc, err) } switch { case metricFamily.Type != nil: // Type already set. We are good. case dtoMetric.Gauge != nil: metricFamily.Type = dto.MetricType_GAUGE.Enum() case dtoMetric.Counter != nil: metricFamily.Type = dto.MetricType_COUNTER.Enum() case dtoMetric.Summary != nil: metricFamily.Type = dto.MetricType_SUMMARY.Enum() case dtoMetric.Untyped != nil: metricFamily.Type = dto.MetricType_UNTYPED.Enum() case dtoMetric.Histogram != nil: metricFamily.Type = dto.MetricType_HISTOGRAM.Enum() default: return fmt.Errorf("empty metric collected: %s", dtoMetric) } if r.collectChecksEnabled { if err := r.checkConsistency(metricFamily, dtoMetric, desc, metricHashes); err != nil { return err } } metricFamily.Metric = append(metricFamily.Metric, dtoMetric) } if r.metricFamilyInjectionHook != nil { for _, mf := range r.metricFamilyInjectionHook() { existingMF, exists := metricFamiliesByName[mf.GetName()] if !exists { metricFamiliesByName[mf.GetName()] = mf if r.collectChecksEnabled { for _, m := range mf.Metric { if err := r.checkConsistency(mf, m, nil, metricHashes); err != nil { return err } } } continue } for _, m := range mf.Metric { if r.collectChecksEnabled { if err := r.checkConsistency(existingMF, m, nil, metricHashes); err != nil { return err } } existingMF.Metric = append(existingMF.Metric, m) } } } // Now that MetricFamilies are all set, sort their Metrics // lexicographically by their label values. for _, mf := range metricFamiliesByName { sort.Sort(metricSorter(mf.Metric)) } // Write out MetricFamilies sorted by their name. names := make([]string, 0, len(metricFamiliesByName)) for name := range metricFamiliesByName { names = append(names, name) } sort.Strings(names) for _, name := range names { if err := encoder.Encode(metricFamiliesByName[name]); err != nil { return err } } return nil } func (r *registry) checkConsistency(metricFamily *dto.MetricFamily, dtoMetric *dto.Metric, desc *Desc, metricHashes map[uint64]struct{}) error { // Type consistency with metric family. if metricFamily.GetType() == dto.MetricType_GAUGE && dtoMetric.Gauge == nil || metricFamily.GetType() == dto.MetricType_COUNTER && dtoMetric.Counter == nil || metricFamily.GetType() == dto.MetricType_SUMMARY && dtoMetric.Summary == nil || metricFamily.GetType() == dto.MetricType_HISTOGRAM && dtoMetric.Histogram == nil || metricFamily.GetType() == dto.MetricType_UNTYPED && dtoMetric.Untyped == nil { return fmt.Errorf( "collected metric %s %s is not a %s", metricFamily.GetName(), dtoMetric, metricFamily.GetType(), ) } // Is the metric unique (i.e. no other metric with the same name and the same label values)? h := hashNew() h = hashAdd(h, metricFamily.GetName()) h = hashAddByte(h, separatorByte) // Make sure label pairs are sorted. We depend on it for the consistency // check. Label pairs must be sorted by contract. But the point of this // method is to check for contract violations. So we better do the sort // now. sort.Sort(LabelPairSorter(dtoMetric.Label)) for _, lp := range dtoMetric.Label { h = hashAdd(h, lp.GetValue()) h = hashAddByte(h, separatorByte) } if _, exists := metricHashes[h]; exists { return fmt.Errorf( "collected metric %s %s was collected before with the same name and label values", metricFamily.GetName(), dtoMetric, ) } metricHashes[h] = struct{}{} if desc == nil { return nil // Nothing left to check if we have no desc. } // Desc consistency with metric family. if metricFamily.GetName() != desc.fqName { return fmt.Errorf( "collected metric %s %s has name %q but should have %q", metricFamily.GetName(), dtoMetric, metricFamily.GetName(), desc.fqName, ) } if metricFamily.GetHelp() != desc.help { return fmt.Errorf( "collected metric %s %s has help %q but should have %q", metricFamily.GetName(), dtoMetric, metricFamily.GetHelp(), desc.help, ) } // Is the desc consistent with the content of the metric? lpsFromDesc := make([]*dto.LabelPair, 0, len(dtoMetric.Label)) lpsFromDesc = append(lpsFromDesc, desc.constLabelPairs...) for _, l := range desc.variableLabels { lpsFromDesc = append(lpsFromDesc, &dto.LabelPair{ Name: proto.String(l), }) } if len(lpsFromDesc) != len(dtoMetric.Label) { return fmt.Errorf( "labels in collected metric %s %s are inconsistent with descriptor %s", metricFamily.GetName(), dtoMetric, desc, ) } sort.Sort(LabelPairSorter(lpsFromDesc)) for i, lpFromDesc := range lpsFromDesc { lpFromMetric := dtoMetric.Label[i] if lpFromDesc.GetName() != lpFromMetric.GetName() || lpFromDesc.Value != nil && lpFromDesc.GetValue() != lpFromMetric.GetValue() { return fmt.Errorf( "labels in collected metric %s %s are inconsistent with descriptor %s", metricFamily.GetName(), dtoMetric, desc, ) } } r.mtx.RLock() // Remaining checks need the read lock. defer r.mtx.RUnlock() // Is the desc registered? if _, exist := r.descIDs[desc.id]; !exist { return fmt.Errorf( "collected metric %s %s with unregistered descriptor %s", metricFamily.GetName(), dtoMetric, desc, ) } return nil } func (r *registry) getBuf() *bytes.Buffer { select { case buf := <-r.bufPool: return buf default: return &bytes.Buffer{} } } func (r *registry) giveBuf(buf *bytes.Buffer) { buf.Reset() select { case r.bufPool <- buf: default: } } func (r *registry) getMetricFamily() *dto.MetricFamily { select { case mf := <-r.metricFamilyPool: return mf default: return &dto.MetricFamily{} } } func (r *registry) giveMetricFamily(mf *dto.MetricFamily) { mf.Reset() select { case r.metricFamilyPool <- mf: default: } } func (r *registry) getMetric() *dto.Metric { select { case m := <-r.metricPool: return m default: return &dto.Metric{} } } func (r *registry) giveMetric(m *dto.Metric) { m.Reset() select { case r.metricPool <- m: default: } } func newRegistry() *registry { return ®istry{ collectorsByID: map[uint64]Collector{}, descIDs: map[uint64]struct{}{}, dimHashesByName: map[string]uint64{}, bufPool: make(chan *bytes.Buffer, numBufs), metricFamilyPool: make(chan *dto.MetricFamily, numMetricFamilies), metricPool: make(chan *dto.Metric, numMetrics), } } // decorateWriter wraps a writer to handle gzip compression if requested. It // returns the decorated writer and the appropriate "Content-Encoding" header // (which is empty if no compression is enabled). func decorateWriter(request *http.Request, writer io.Writer) (io.Writer, string) { header := request.Header.Get(acceptEncodingHeader) parts := strings.Split(header, ",") for _, part := range parts { part := strings.TrimSpace(part) if part == "gzip" || strings.HasPrefix(part, "gzip;") { return gzip.NewWriter(writer), "gzip" } } return writer, "" } type metricSorter []*dto.Metric func (s metricSorter) Len() int { return len(s) } func (s metricSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s metricSorter) Less(i, j int) bool { if len(s[i].Label) != len(s[j].Label) { // This should not happen. The metrics are // inconsistent. However, we have to deal with the fact, as // people might use custom collectors or metric family injection // to create inconsistent metrics. So let's simply compare the // number of labels in this case. That will still yield // reproducible sorting. return len(s[i].Label) < len(s[j].Label) } for n, lp := range s[i].Label { vi := lp.GetValue() vj := s[j].Label[n].GetValue() if vi != vj { return vi < vj } } // We should never arrive here. Multiple metrics with the same // label set in the same scrape will lead to undefined ingestion // behavior. However, as above, we have to provide stable sorting // here, even for inconsistent metrics. So sort equal metrics // by their timestamp, with missing timestamps (implying "now") // coming last. if s[i].TimestampMs == nil { return false } if s[j].TimestampMs == nil { return true } return s[i].GetTimestampMs() < s[j].GetTimestampMs() }